fix(blooms): Ship chunkrefs in task payload (#13677)

pull/13605/head^2
Salva Corts 1 year ago committed by GitHub
parent a77457f8ab
commit 450bbce938
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      pkg/bloombuild/builder/builder.go
  2. 24
      pkg/bloombuild/common/tsdb.go
  3. 99
      pkg/bloombuild/planner/planner.go
  4. 126
      pkg/bloombuild/planner/planner_test.go
  5. 43
      pkg/bloombuild/protos/compat.go
  6. 448
      pkg/bloombuild/protos/types.pb.go
  7. 9
      pkg/bloombuild/protos/types.proto

@ -335,7 +335,7 @@ func (b *Builder) processTask(
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation.
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, tenant, task.TSDB, gap)
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
@ -454,15 +454,9 @@ func (b *Builder) processTask(
func (b *Builder) loadWorkForGap(
ctx context.Context,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())

@ -9,7 +9,6 @@ import (
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -30,6 +29,11 @@ const (
gzipExtension = ".gz"
)
type ClosableForSeries interface {
sharding.ForSeries
Close() error
}
type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
@ -38,8 +42,7 @@ type TSDBStore interface {
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error)
) (ClosableForSeries, error)
}
// BloomTSDBStore is a wrapper around the storage.Client interface which
@ -90,8 +93,7 @@ func (b *BloomTSDBStore) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
withCompression := id.Name() + gzipExtension
data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
@ -118,13 +120,8 @@ func (b *BloomTSDBStore) LoadTSDB(
}
idx := tsdb.NewTSDBIndex(reader)
defer func() {
if err := idx.Close(); err != nil {
level.Error(b.logger).Log("msg", "failed to close index", "err", err)
}
}()
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
return idx, nil
}
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
@ -251,12 +248,11 @@ func (s *TSDBStores) LoadTSDB(
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (iter.Iterator[*v1.Series], error) {
) (ClosableForSeries, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
return store.LoadTSDB(ctx, table, tenant, id, bounds)
return store.LoadTSDB(ctx, table, tenant, id)
}

@ -365,6 +365,29 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}
if len(tsdbs) == 0 {
return nil, metas, nil
}
openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()
for _, ownershipRange := range ownershipRanges {
logger := log.With(logger, "ownership", ownershipRange.String())
@ -372,7 +395,7 @@ func (p *Planner) computeTasks(
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
gaps, err := p.findOutdatedGaps(ctx, tenant, openTSDBs, ownershipRange, metasInBounds, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
@ -453,6 +476,26 @@ func (p *Planner) processTenantTaskResults(
return tasksSucceed, nil
}
func openAllTSDBs(
ctx context.Context,
table config.DayTable,
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}
openTSDBs[idx] = tsdb
}
return openTSDBs, nil
}
// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
// It returns the up-to-date metas from the `metas` argument.
func (p *Planner) deleteOutdatedMetasAndBlocks(
@ -655,28 +698,17 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []protos.GapWithBlocks
gaps []protos.Gap
}
func (p *Planner) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTable,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]blockPlan, error) {
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
return nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}
if len(tsdbs) == 0 {
return nil, nil
}
// Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(ownershipRange, tsdbs, metas)
@ -690,7 +722,7 @@ func (p *Planner) findOutdatedGaps(
return nil, nil
}
work, err := blockPlansForGaps(tsdbsWithGaps, metas)
work, err := blockPlansForGaps(ctx, tenant, tsdbsWithGaps, metas)
if err != nil {
level.Error(logger).Log("msg", "failed to create plan", "err", err)
return nil, fmt.Errorf("failed to create plan: %w", err)
@ -701,18 +733,19 @@ func (p *Planner) findOutdatedGaps(
// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []v1.FingerprintBounds
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
tsdb common.ClosableForSeries
gaps []v1.FingerprintBounds
}
// gapsBetweenTSDBsAndMetas returns if the metas are up-to-date with the TSDBs. This is determined by asserting
// that for each TSDB, there are metas covering the entire ownership range which were generated from that specific TSDB.
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for _, db := range tsdbs {
for db, tsdb := range tsdbs {
id := db.Name()
relevantMetas := make([]v1.FingerprintBounds, 0, len(metas))
@ -731,8 +764,9 @@ func gapsBetweenTSDBsAndMetas(
if len(gaps) > 0 {
res = append(res, tsdbGaps{
tsdb: db,
gaps: gaps,
tsdbIdentifier: db,
tsdb: tsdb,
gaps: gaps,
})
}
}
@ -743,22 +777,35 @@ func gapsBetweenTSDBsAndMetas(
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
func blockPlansForGaps(
ctx context.Context,
tenant string,
tsdbs []tsdbGaps,
metas []bloomshipper.Meta,
) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))
for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]protos.GapWithBlocks, 0, len(idx.gaps)),
tsdb: idx.tsdbIdentifier,
gaps: make([]protos.Gap, 0, len(idx.gaps)),
}
for _, gap := range idx.gaps {
planGap := protos.GapWithBlocks{
planGap := protos.Gap{
Bounds: gap,
}
for _, meta := range metas {
seriesItr, err := common.NewTSDBSeriesIter(ctx, tenant, idx.tsdb, gap)
if err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.String(), err)
}
planGap.Series, err = iter.Collect(seriesItr)
if err != nil {
return nil, fmt.Errorf("failed to collect series: %w", err)
}
for _, meta := range metas {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue

@ -16,10 +16,12 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
@ -31,6 +33,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)
@ -68,14 +71,16 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
err bool
exp []tsdbGaps
ownershipRange v1.FingerprintBounds
tsdbs []tsdb.SingleTenantTSDBIdentifier
tsdbs map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries
metas []bloomshipper.Meta
}{
{
desc: "non-overlapping tsdbs and metas",
err: true,
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(11, 20, []int{0}, nil),
},
@ -83,13 +88,15 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
{
desc: "single tsdb",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0)},
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
},
metas: []bloomshipper.Meta{
genMeta(4, 8, []int{0}, nil),
},
exp: []tsdbGaps{
{
tsdb: tsdbID(0),
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 3),
v1.NewBounds(9, 10),
@ -100,20 +107,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
{
desc: "multiple tsdbs with separate blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)},
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, nil),
genMeta(6, 10, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdb: tsdbID(0),
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdb: tsdbID(1),
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(0, 5),
},
@ -123,20 +133,23 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
{
desc: "multiple tsdbs with the same blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.SingleTenantTSDBIdentifier{tsdbID(0), tsdbID(1)},
tsdbs: map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries{
tsdbID(0): nil,
tsdbID(1): nil,
},
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0, 1}, nil),
genMeta(6, 8, []int{1}, nil),
},
exp: []tsdbGaps{
{
tsdb: tsdbID(0),
tsdbIdentifier: tsdbID(0),
gaps: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
},
{
tsdb: tsdbID(1),
tsdbIdentifier: tsdbID(1),
gaps: []v1.FingerprintBounds{
v1.NewBounds(9, 10),
},
@ -220,9 +233,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
},
},
},
@ -238,9 +252,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
},
},
@ -261,9 +276,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
},
},
},
@ -280,9 +296,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 8),
Series: genSeries(v1.NewBounds(0, 8)),
Blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
},
},
@ -306,14 +323,16 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
{
Bounds: v1.NewBounds(3, 5),
Series: genSeries(v1.NewBounds(3, 5)),
Blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
},
{
Bounds: v1.NewBounds(9, 10),
Series: genSeries(v1.NewBounds(9, 10)),
Blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
},
},
@ -321,9 +340,10 @@ func Test_blockPlansForGaps(t *testing.T) {
// tsdb (id=1) can source chunks from the blocks built from tsdb (id=0)
{
tsdb: tsdbID(1),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 2),
Series: genSeries(v1.NewBounds(0, 2)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
@ -331,6 +351,7 @@ func Test_blockPlansForGaps(t *testing.T) {
},
{
Bounds: v1.NewBounds(6, 7),
Series: genSeries(v1.NewBounds(6, 7)),
Blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
},
},
@ -354,9 +375,10 @@ func Test_blockPlansForGaps(t *testing.T) {
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []protos.GapWithBlocks{
gaps: []protos.Gap{
{
Bounds: v1.NewBounds(0, 10),
Series: genSeries(v1.NewBounds(0, 10)),
Blocks: []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
@ -369,20 +391,86 @@ func Test_blockPlansForGaps(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
// We add series spanning the whole FP ownership range
tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries)
for _, id := range tc.tsdbs {
tsdbs[id] = newFakeForSeries(genSeries(tc.ownershipRange))
}
// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
// separately and it's used to generate input in our regular code path (easier to write tests this way).
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tc.tsdbs, tc.metas)
gaps, err := gapsBetweenTSDBsAndMetas(tc.ownershipRange, tsdbs, tc.metas)
require.NoError(t, err)
plans, err := blockPlansForGaps(gaps, tc.metas)
plans, err := blockPlansForGaps(
context.Background(),
"fakeTenant",
gaps,
tc.metas,
)
if tc.err {
require.Error(t, err)
return
}
require.Equal(t, tc.exp, plans)
})
}
}
func genSeries(bounds v1.FingerprintBounds) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1))
for i := bounds.Min; i <= bounds.Max; i++ {
series = append(series, &v1.Series{
Fingerprint: i,
Chunks: v1.ChunkRefs{
{
From: 0,
Through: 1,
Checksum: 1,
},
},
})
}
return series
}
type fakeForSeries struct {
series []*v1.Series
}
func newFakeForSeries(series []*v1.Series) *fakeForSeries {
return &fakeForSeries{
series: series,
}
}
func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}
for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
})
}
if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}
func (f fakeForSeries) Close() error {
return nil
}
func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {

@ -7,14 +7,16 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)
type GapWithBlocks struct {
type Gap struct {
Bounds v1.FingerprintBounds
Series []*v1.Series
Blocks []bloomshipper.BlockRef
}
@ -25,7 +27,7 @@ type Task struct {
Tenant string
OwnershipBounds v1.FingerprintBounds
TSDB tsdb.SingleTenantTSDBIdentifier
Gaps []GapWithBlocks
Gaps []Gap
}
func NewTask(
@ -33,10 +35,10 @@ func NewTask(
tenant string,
bounds v1.FingerprintBounds,
tsdb tsdb.SingleTenantTSDBIdentifier,
gaps []GapWithBlocks,
gaps []Gap,
) *Task {
return &Task{
ID: fmt.Sprintf("%s-%s-%s-%d-%d", table.Addr(), tenant, bounds.String(), tsdb.Checksum, len(gaps)),
ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)),
Table: table,
Tenant: tenant,
@ -56,12 +58,25 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
return nil, fmt.Errorf("failed to parse tsdb path %s", task.Tsdb)
}
gaps := make([]GapWithBlocks, 0, len(task.Gaps))
gaps := make([]Gap, 0, len(task.Gaps))
for _, gap := range task.Gaps {
bounds := v1.FingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
}
series := make([]*v1.Series, 0, len(gap.Series))
for _, s := range gap.Series {
chunks := make(v1.ChunkRefs, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, v1.ChunkRef(*c))
}
series = append(series, &v1.Series{
Fingerprint: model.Fingerprint(s.Fingerprint),
Chunks: chunks,
})
}
blocks := make([]bloomshipper.BlockRef, 0, len(gap.BlockRef))
for _, block := range gap.BlockRef {
b, err := bloomshipper.BlockRefFromKey(block)
@ -71,8 +86,9 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
blocks = append(blocks, b)
}
gaps = append(gaps, GapWithBlocks{
gaps = append(gaps, Gap{
Bounds: bounds,
Series: series,
Blocks: blocks,
})
}
@ -102,11 +118,26 @@ func (t *Task) ToProtoTask() *ProtoTask {
blockRefs = append(blockRefs, block.String())
}
series := make([]*ProtoSeries, 0, len(gap.Series))
for _, s := range gap.Series {
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunk := logproto.ShortRef(c)
chunks = append(chunks, &chunk)
}
series = append(series, &ProtoSeries{
Fingerprint: uint64(s.Fingerprint),
Chunks: chunks,
})
}
protoGaps = append(protoGaps, &ProtoGapWithBlocks{
Bounds: ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
Series: series,
BlockRef: blockRefs,
})
}

@ -7,6 +7,7 @@ import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
logproto "github.com/grafana/loki/v3/pkg/logproto"
github_com_prometheus_common_model "github.com/prometheus/common/model"
io "io"
math "math"
@ -131,15 +132,67 @@ func (m *DayTable) GetPrefix() string {
return ""
}
type ProtoSeries struct {
Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
Chunks []*logproto.ShortRef `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks,omitempty"`
}
func (m *ProtoSeries) Reset() { *m = ProtoSeries{} }
func (*ProtoSeries) ProtoMessage() {}
func (*ProtoSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_5325fb0610e1e9ae, []int{2}
}
func (m *ProtoSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ProtoSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ProtoSeries.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ProtoSeries) XXX_Merge(src proto.Message) {
xxx_messageInfo_ProtoSeries.Merge(m, src)
}
func (m *ProtoSeries) XXX_Size() int {
return m.Size()
}
func (m *ProtoSeries) XXX_DiscardUnknown() {
xxx_messageInfo_ProtoSeries.DiscardUnknown(m)
}
var xxx_messageInfo_ProtoSeries proto.InternalMessageInfo
func (m *ProtoSeries) GetFingerprint() uint64 {
if m != nil {
return m.Fingerprint
}
return 0
}
func (m *ProtoSeries) GetChunks() []*logproto.ShortRef {
if m != nil {
return m.Chunks
}
return nil
}
type ProtoGapWithBlocks struct {
Bounds ProtoFingerprintBounds `protobuf:"bytes,1,opt,name=bounds,proto3" json:"bounds"`
BlockRef []string `protobuf:"bytes,2,rep,name=blockRef,proto3" json:"blockRef,omitempty"`
Series []*ProtoSeries `protobuf:"bytes,2,rep,name=series,proto3" json:"series,omitempty"`
BlockRef []string `protobuf:"bytes,3,rep,name=blockRef,proto3" json:"blockRef,omitempty"`
}
func (m *ProtoGapWithBlocks) Reset() { *m = ProtoGapWithBlocks{} }
func (*ProtoGapWithBlocks) ProtoMessage() {}
func (*ProtoGapWithBlocks) Descriptor() ([]byte, []int) {
return fileDescriptor_5325fb0610e1e9ae, []int{2}
return fileDescriptor_5325fb0610e1e9ae, []int{3}
}
func (m *ProtoGapWithBlocks) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -175,6 +228,13 @@ func (m *ProtoGapWithBlocks) GetBounds() ProtoFingerprintBounds {
return ProtoFingerprintBounds{}
}
func (m *ProtoGapWithBlocks) GetSeries() []*ProtoSeries {
if m != nil {
return m.Series
}
return nil
}
func (m *ProtoGapWithBlocks) GetBlockRef() []string {
if m != nil {
return m.BlockRef
@ -197,7 +257,7 @@ type ProtoTask struct {
func (m *ProtoTask) Reset() { *m = ProtoTask{} }
func (*ProtoTask) ProtoMessage() {}
func (*ProtoTask) Descriptor() ([]byte, []int) {
return fileDescriptor_5325fb0610e1e9ae, []int{3}
return fileDescriptor_5325fb0610e1e9ae, []int{4}
}
func (m *ProtoTask) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -277,7 +337,7 @@ type ProtoMeta struct {
func (m *ProtoMeta) Reset() { *m = ProtoMeta{} }
func (*ProtoMeta) ProtoMessage() {}
func (*ProtoMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_5325fb0610e1e9ae, []int{4}
return fileDescriptor_5325fb0610e1e9ae, []int{5}
}
func (m *ProtoMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -336,7 +396,7 @@ type ProtoTaskResult struct {
func (m *ProtoTaskResult) Reset() { *m = ProtoTaskResult{} }
func (*ProtoTaskResult) ProtoMessage() {}
func (*ProtoTaskResult) Descriptor() ([]byte, []int) {
return fileDescriptor_5325fb0610e1e9ae, []int{5}
return fileDescriptor_5325fb0610e1e9ae, []int{6}
}
func (m *ProtoTaskResult) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -389,6 +449,7 @@ func (m *ProtoTaskResult) GetCreatedMetas() []*ProtoMeta {
func init() {
proto.RegisterType((*ProtoFingerprintBounds)(nil), "protos.ProtoFingerprintBounds")
proto.RegisterType((*DayTable)(nil), "protos.DayTable")
proto.RegisterType((*ProtoSeries)(nil), "protos.ProtoSeries")
proto.RegisterType((*ProtoGapWithBlocks)(nil), "protos.ProtoGapWithBlocks")
proto.RegisterType((*ProtoTask)(nil), "protos.ProtoTask")
proto.RegisterType((*ProtoMeta)(nil), "protos.ProtoMeta")
@ -398,42 +459,47 @@ func init() {
func init() { proto.RegisterFile("pkg/bloombuild/protos/types.proto", fileDescriptor_5325fb0610e1e9ae) }
var fileDescriptor_5325fb0610e1e9ae = []byte{
// 551 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0xb1, 0x6f, 0xd3, 0x4e,
0x18, 0xb5, 0xe3, 0x34, 0xbf, 0xe6, 0xd2, 0x5f, 0x81, 0x53, 0x55, 0x59, 0x11, 0xba, 0x04, 0x0f,
0x28, 0x93, 0x2d, 0x05, 0x75, 0x40, 0x62, 0xb2, 0xa2, 0x22, 0x40, 0x95, 0xd0, 0x35, 0x12, 0x12,
0xdb, 0x39, 0xbe, 0x3a, 0x56, 0x6c, 0x9f, 0xe5, 0x3b, 0xa3, 0x64, 0xe3, 0x4f, 0xe0, 0xcf, 0x60,
0xe6, 0xaf, 0xe8, 0x98, 0xb1, 0x53, 0x44, 0x9c, 0x05, 0x75, 0xea, 0xc4, 0xc0, 0x84, 0xee, 0xce,
0x29, 0x09, 0x62, 0x82, 0xe9, 0xbe, 0xf7, 0xdd, 0x77, 0xef, 0x7b, 0xef, 0xc9, 0x06, 0x4f, 0xf2,
0x59, 0xe4, 0x05, 0x09, 0x63, 0x69, 0x50, 0xc6, 0x49, 0xe8, 0xe5, 0x05, 0x13, 0x8c, 0x7b, 0x62,
0x91, 0x53, 0xee, 0x2a, 0x00, 0x5b, 0xba, 0xd7, 0x3d, 0x89, 0x58, 0xc4, 0x54, 0xed, 0xc9, 0x4a,
0xdf, 0x3a, 0x5f, 0x4c, 0x70, 0xfa, 0x56, 0x56, 0xe7, 0x71, 0x16, 0xd1, 0x22, 0x2f, 0xe2, 0x4c,
0xf8, 0xac, 0xcc, 0x42, 0x0e, 0xdf, 0x00, 0x2b, 0x8d, 0x33, 0xdb, 0xec, 0x9b, 0x83, 0xa6, 0xff,
0xfc, 0x76, 0xd5, 0x93, 0xf0, 0xc7, 0xaa, 0xe7, 0x46, 0xb1, 0x98, 0x96, 0x81, 0x3b, 0x61, 0xa9,
0xdc, 0x97, 0x52, 0x31, 0xa5, 0x25, 0xf7, 0x26, 0x2c, 0x4d, 0x59, 0xe6, 0xa5, 0x2c, 0xa4, 0x89,
0xbb, 0xc3, 0x86, 0xe5, 0x33, 0x45, 0x46, 0xe6, 0x76, 0x63, 0x87, 0x8c, 0xcc, 0xff, 0x8a, 0x8c,
0xcc, 0x9d, 0xd7, 0xe0, 0x70, 0x44, 0x16, 0x63, 0x12, 0x24, 0x14, 0x3e, 0x05, 0xc7, 0x21, 0x59,
0x8c, 0xe3, 0x94, 0x72, 0x41, 0xd2, 0xfc, 0xe2, 0x52, 0x09, 0xb6, 0xf0, 0x6f, 0x5d, 0x78, 0x0a,
0x5a, 0x79, 0x41, 0xaf, 0x62, 0xad, 0xa1, 0x8d, 0x6b, 0xe4, 0xcc, 0x01, 0x54, 0xfe, 0x5f, 0x92,
0xfc, 0x5d, 0x2c, 0xa6, 0x7e, 0xc2, 0x26, 0x33, 0x0e, 0xcf, 0x41, 0x2b, 0x50, 0x29, 0x28, 0xb6,
0xce, 0x10, 0xe9, 0xb8, 0xb8, 0xfb, 0xe7, 0xac, 0xfc, 0xe3, 0xeb, 0x55, 0xcf, 0xb8, 0x5d, 0xf5,
0xea, 0x57, 0xb8, 0x3e, 0x61, 0x17, 0x1c, 0x06, 0x92, 0x11, 0xd3, 0x2b, 0xbb, 0xd1, 0xb7, 0x06,
0x6d, 0x7c, 0x8f, 0x9d, 0xef, 0x26, 0x68, 0x2b, 0xba, 0x31, 0xe1, 0x33, 0x78, 0x0c, 0x1a, 0x71,
0xa8, 0xb6, 0xb5, 0x71, 0x23, 0x0e, 0xe1, 0x19, 0x38, 0x10, 0xd2, 0xa0, 0x92, 0xdb, 0x19, 0x3e,
0xdc, 0x0a, 0xd8, 0x1a, 0xf7, 0xff, 0xaf, 0x57, 0xea, 0x31, 0xac, 0x0f, 0x69, 0x53, 0xd0, 0x8c,
0x64, 0xc2, 0xb6, 0xb4, 0x4d, 0x8d, 0x76, 0x0c, 0x35, 0xff, 0xc9, 0x10, 0x04, 0x4d, 0xc1, 0xc3,
0xc0, 0x3e, 0x50, 0xec, 0xaa, 0x86, 0x2e, 0x68, 0x46, 0x24, 0xe7, 0x76, 0xab, 0x6f, 0x0d, 0x3a,
0xc3, 0xee, 0x1e, 0xf3, 0x5e, 0xac, 0x58, 0xcd, 0x39, 0x51, 0xed, 0xfb, 0x82, 0x0a, 0x02, 0x6d,
0xf0, 0x5f, 0x4a, 0x05, 0x91, 0x01, 0x69, 0xf3, 0x5b, 0x08, 0x1d, 0x70, 0xc4, 0x59, 0x59, 0x4c,
0x28, 0x1f, 0x5f, 0x8e, 0x7c, 0x5e, 0xe7, 0xb7, 0xd7, 0x83, 0x8f, 0x41, 0x7b, 0x9b, 0x27, 0xb7,
0x2d, 0x35, 0xf0, 0xab, 0xe1, 0x7c, 0x00, 0x0f, 0xee, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9,
0x10, 0x3e, 0x7b, 0x35, 0xaa, 0xb7, 0xd5, 0x08, 0x9e, 0x80, 0x03, 0x5a, 0x14, 0xac, 0xa8, 0xbf,
0x0e, 0x0d, 0xe0, 0x19, 0x38, 0x9a, 0x14, 0x94, 0x08, 0x1a, 0x4a, 0xad, 0x7a, 0x43, 0x67, 0xf8,
0x68, 0xcf, 0xa1, 0xbc, 0xc1, 0x7b, 0x63, 0xfe, 0x8b, 0xe5, 0x1a, 0x19, 0x37, 0x6b, 0x64, 0xdc,
0xad, 0x91, 0xf9, 0xb1, 0x42, 0xe6, 0xe7, 0x0a, 0x99, 0xd7, 0x15, 0x32, 0x97, 0x15, 0x32, 0xbf,
0x56, 0xc8, 0xfc, 0x56, 0x21, 0xe3, 0xae, 0x42, 0xe6, 0xa7, 0x0d, 0x32, 0x96, 0x1b, 0x64, 0xdc,
0x6c, 0x90, 0xf1, 0xbe, 0xfe, 0x51, 0x03, 0x7d, 0x3e, 0xfb, 0x19, 0x00, 0x00, 0xff, 0xff, 0xdc,
0x0e, 0x2e, 0xd1, 0xdc, 0x03, 0x00, 0x00,
// 630 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0x3f, 0x6f, 0xd3, 0x40,
0x1c, 0xb5, 0x93, 0x34, 0x34, 0x97, 0x52, 0xe0, 0xa8, 0x2a, 0x2b, 0x42, 0x97, 0xe0, 0x01, 0x55,
0x20, 0x39, 0x52, 0x50, 0x07, 0x24, 0x26, 0xab, 0x2a, 0x02, 0x54, 0x09, 0x5d, 0x22, 0x21, 0xc1,
0x74, 0x8e, 0x2f, 0x8e, 0x15, 0xdb, 0x67, 0xf9, 0xce, 0xd0, 0x6c, 0x7c, 0x04, 0xbe, 0x04, 0x12,
0x33, 0x9f, 0xa2, 0x63, 0xc7, 0x4e, 0x11, 0x75, 0x17, 0xd4, 0xa9, 0x13, 0x03, 0x13, 0xba, 0x3f,
0x69, 0x13, 0xc4, 0x04, 0xd3, 0xdd, 0xfb, 0xdd, 0xef, 0xde, 0xef, 0xbd, 0x77, 0x96, 0xc1, 0xc3,
0x7c, 0x16, 0xf5, 0x83, 0x84, 0xb1, 0x34, 0x28, 0xe3, 0x24, 0xec, 0xe7, 0x05, 0x13, 0x8c, 0xf7,
0xc5, 0x3c, 0xa7, 0xdc, 0x53, 0x00, 0x36, 0x75, 0xad, 0xb3, 0x13, 0xb1, 0x88, 0xa9, 0x7d, 0x5f,
0xee, 0xf4, 0x69, 0xa7, 0x2b, 0x09, 0x12, 0x16, 0xe9, 0x03, 0xc5, 0x14, 0x11, 0x41, 0x3f, 0x92,
0xb9, 0x6e, 0x70, 0xbf, 0xd9, 0x60, 0xf7, 0x8d, 0xdc, 0x1d, 0xc6, 0x59, 0x44, 0x8b, 0xbc, 0x88,
0x33, 0xe1, 0xb3, 0x32, 0x0b, 0x39, 0x7c, 0x0d, 0xea, 0x69, 0x9c, 0x39, 0x76, 0xcf, 0xde, 0x6b,
0xf8, 0xcf, 0x2e, 0x17, 0x5d, 0x09, 0x7f, 0x2d, 0xba, 0x5e, 0x14, 0x8b, 0x69, 0x19, 0x78, 0x63,
0x96, 0x4a, 0x41, 0x29, 0x15, 0x53, 0x5a, 0xf2, 0xfe, 0x98, 0xa5, 0x29, 0xcb, 0xfa, 0x29, 0x0b,
0x69, 0xe2, 0xad, 0xb0, 0x61, 0x79, 0x4d, 0x91, 0x91, 0x63, 0xa7, 0xb6, 0x42, 0x46, 0x8e, 0xff,
0x89, 0x8c, 0x1c, 0xbb, 0xaf, 0xc0, 0xe6, 0x01, 0x99, 0x8f, 0x48, 0x90, 0x50, 0xf8, 0x08, 0x6c,
0x87, 0x64, 0x3e, 0x8a, 0x53, 0xca, 0x05, 0x49, 0xf3, 0xa3, 0xa1, 0x12, 0x5c, 0xc7, 0x7f, 0x54,
0xe1, 0x2e, 0x68, 0xe6, 0x05, 0x9d, 0xc4, 0x5a, 0x43, 0x0b, 0x1b, 0xe4, 0xbe, 0x07, 0x6d, 0xe5,
0x7f, 0x48, 0x8b, 0x98, 0x72, 0xd8, 0x03, 0xed, 0xc9, 0xcd, 0x38, 0x6d, 0x1e, 0xaf, 0x96, 0xe0,
0x63, 0xd0, 0x1c, 0x4f, 0xcb, 0x6c, 0xc6, 0x9d, 0x5a, 0xaf, 0xbe, 0xd7, 0x1e, 0x40, 0x6f, 0x99,
0xaf, 0x37, 0x9c, 0xb2, 0x42, 0x60, 0x3a, 0xc1, 0xa6, 0xc3, 0xfd, 0x62, 0x03, 0xa8, 0xd8, 0x5f,
0x90, 0xfc, 0x6d, 0x2c, 0xa6, 0x7e, 0xc2, 0xc6, 0x33, 0x0e, 0x0f, 0x41, 0x33, 0x50, 0x19, 0x2b,
0xfe, 0xf6, 0x00, 0xe9, 0xc7, 0xe0, 0xde, 0xdf, 0x5f, 0xc2, 0xdf, 0x3e, 0x59, 0x74, 0xad, 0xcb,
0x45, 0xd7, 0xdc, 0xc2, 0x66, 0x85, 0x4f, 0x40, 0x93, 0x2b, 0xd9, 0x46, 0xca, 0xfd, 0x35, 0x1e,
0xed, 0x08, 0x9b, 0x16, 0xd8, 0x01, 0x9b, 0x81, 0x1c, 0x8f, 0xe9, 0xc4, 0xa9, 0xf7, 0xea, 0x7b,
0x2d, 0x7c, 0x8d, 0xdd, 0x9f, 0x36, 0x68, 0xa9, 0x3b, 0x23, 0xc2, 0x67, 0x70, 0x1b, 0xd4, 0xe2,
0x50, 0x49, 0x6b, 0xe1, 0x5a, 0x1c, 0xc2, 0x7d, 0xb0, 0x21, 0x64, 0xd6, 0x2a, 0xb9, 0xf6, 0xe0,
0xee, 0x72, 0xca, 0xf2, 0x0d, 0xfc, 0xdb, 0x46, 0x9f, 0x6e, 0xc3, 0x7a, 0x91, 0x89, 0x0b, 0x9a,
0x91, 0x4c, 0x38, 0x75, 0x9d, 0xb8, 0x46, 0x2b, 0xee, 0x1b, 0xff, 0xe5, 0x1e, 0x82, 0x86, 0xe0,
0x61, 0xe0, 0x6c, 0x28, 0x76, 0xb5, 0x87, 0x1e, 0x68, 0x44, 0x24, 0xe7, 0x4e, 0x53, 0xe5, 0xd1,
0x59, 0x63, 0x5e, 0x7b, 0x03, 0xac, 0xfa, 0xdc, 0xc8, 0xf8, 0x3e, 0xa2, 0x82, 0x40, 0x07, 0xdc,
0x4a, 0xa9, 0x20, 0x32, 0x20, 0x6d, 0x7e, 0x09, 0xa1, 0x0b, 0xb6, 0x38, 0x2b, 0x8b, 0x31, 0xe5,
0xa3, 0xe1, 0x81, 0xaf, 0xe3, 0x6e, 0xe1, 0xb5, 0x1a, 0x7c, 0x00, 0x5a, 0xcb, 0x3c, 0xb9, 0x09,
0xf8, 0xa6, 0xe0, 0x7e, 0x00, 0x77, 0xae, 0x03, 0xc6, 0x94, 0x97, 0x89, 0x50, 0xf9, 0x10, 0x3e,
0x7b, 0x79, 0x60, 0xa6, 0x19, 0x04, 0x77, 0xc0, 0x06, 0x2d, 0x0a, 0x56, 0x98, 0x0f, 0x55, 0x03,
0xb8, 0x0f, 0xb6, 0xc6, 0x05, 0x25, 0x82, 0x86, 0x52, 0xab, 0x9e, 0xd0, 0x1e, 0xdc, 0x5b, 0x73,
0x28, 0x4f, 0xf0, 0x5a, 0x9b, 0xff, 0xfc, 0xf4, 0x1c, 0x59, 0x67, 0xe7, 0xc8, 0xba, 0x3a, 0x47,
0xf6, 0xa7, 0x0a, 0xd9, 0x5f, 0x2b, 0x64, 0x9f, 0x54, 0xc8, 0x3e, 0xad, 0x90, 0xfd, 0xbd, 0x42,
0xf6, 0x8f, 0x0a, 0x59, 0x57, 0x15, 0xb2, 0x3f, 0x5f, 0x20, 0xeb, 0xf4, 0x02, 0x59, 0x67, 0x17,
0xc8, 0x7a, 0x67, 0x7e, 0x2a, 0x81, 0x5e, 0x9f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xf9, 0x38,
0x02, 0xe1, 0x88, 0x04, 0x00, 0x00,
}
func (this *ProtoFingerprintBounds) Equal(that interface{}) bool {
@ -490,6 +556,38 @@ func (this *DayTable) Equal(that interface{}) bool {
}
return true
}
func (this *ProtoSeries) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*ProtoSeries)
if !ok {
that2, ok := that.(ProtoSeries)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Fingerprint != that1.Fingerprint {
return false
}
if len(this.Chunks) != len(that1.Chunks) {
return false
}
for i := range this.Chunks {
if !this.Chunks[i].Equal(that1.Chunks[i]) {
return false
}
}
return true
}
func (this *ProtoGapWithBlocks) Equal(that interface{}) bool {
if that == nil {
return this == nil
@ -512,6 +610,14 @@ func (this *ProtoGapWithBlocks) Equal(that interface{}) bool {
if !this.Bounds.Equal(&that1.Bounds) {
return false
}
if len(this.Series) != len(that1.Series) {
return false
}
for i := range this.Series {
if !this.Series[i].Equal(that1.Series[i]) {
return false
}
}
if len(this.BlockRef) != len(that1.BlockRef) {
return false
}
@ -663,13 +769,29 @@ func (this *DayTable) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *ProtoGapWithBlocks) GoString() string {
func (this *ProtoSeries) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s = append(s, "&protos.ProtoSeries{")
s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
if this.Chunks != nil {
s = append(s, "Chunks: "+fmt.Sprintf("%#v", this.Chunks)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
func (this *ProtoGapWithBlocks) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&protos.ProtoGapWithBlocks{")
s = append(s, "Bounds: "+strings.Replace(this.Bounds.GoString(), `&`, ``, 1)+",\n")
if this.Series != nil {
s = append(s, "Series: "+fmt.Sprintf("%#v", this.Series)+",\n")
}
s = append(s, "BlockRef: "+fmt.Sprintf("%#v", this.BlockRef)+",\n")
s = append(s, "}")
return strings.Join(s, "")
@ -793,6 +915,48 @@ func (m *DayTable) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *ProtoSeries) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ProtoSeries) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ProtoSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Chunks) > 0 {
for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
if m.Fingerprint != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Fingerprint))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *ProtoGapWithBlocks) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -819,6 +983,20 @@ func (m *ProtoGapWithBlocks) MarshalToSizedBuffer(dAtA []byte) (int, error) {
copy(dAtA[i:], m.BlockRef[iNdEx])
i = encodeVarintTypes(dAtA, i, uint64(len(m.BlockRef[iNdEx])))
i--
dAtA[i] = 0x1a
}
}
if len(m.Series) > 0 {
for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
}
@ -1054,6 +1232,24 @@ func (m *DayTable) Size() (n int) {
return n
}
func (m *ProtoSeries) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Fingerprint != 0 {
n += 1 + sovTypes(uint64(m.Fingerprint))
}
if len(m.Chunks) > 0 {
for _, e := range m.Chunks {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
return n
}
func (m *ProtoGapWithBlocks) Size() (n int) {
if m == nil {
return 0
@ -1062,6 +1258,12 @@ func (m *ProtoGapWithBlocks) Size() (n int) {
_ = l
l = m.Bounds.Size()
n += 1 + l + sovTypes(uint64(l))
if len(m.Series) > 0 {
for _, e := range m.Series {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
if len(m.BlockRef) > 0 {
for _, s := range m.BlockRef {
l = len(s)
@ -1178,12 +1380,34 @@ func (this *DayTable) String() string {
}, "")
return s
}
func (this *ProtoSeries) String() string {
if this == nil {
return "nil"
}
repeatedStringForChunks := "[]*ShortRef{"
for _, f := range this.Chunks {
repeatedStringForChunks += strings.Replace(fmt.Sprintf("%v", f), "ShortRef", "logproto.ShortRef", 1) + ","
}
repeatedStringForChunks += "}"
s := strings.Join([]string{`&ProtoSeries{`,
`Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`,
`Chunks:` + repeatedStringForChunks + `,`,
`}`,
}, "")
return s
}
func (this *ProtoGapWithBlocks) String() string {
if this == nil {
return "nil"
}
repeatedStringForSeries := "[]*ProtoSeries{"
for _, f := range this.Series {
repeatedStringForSeries += strings.Replace(f.String(), "ProtoSeries", "ProtoSeries", 1) + ","
}
repeatedStringForSeries += "}"
s := strings.Join([]string{`&ProtoGapWithBlocks{`,
`Bounds:` + strings.Replace(strings.Replace(this.Bounds.String(), "ProtoFingerprintBounds", "ProtoFingerprintBounds", 1), `&`, ``, 1) + `,`,
`Series:` + repeatedStringForSeries + `,`,
`BlockRef:` + fmt.Sprintf("%v", this.BlockRef) + `,`,
`}`,
}, "")
@ -1441,6 +1665,112 @@ func (m *DayTable) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *ProtoSeries) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ProtoSeries: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ProtoSeries: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType)
}
m.Fingerprint = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Fingerprint |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Chunks = append(m.Chunks, &logproto.ShortRef{})
if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1504,6 +1834,40 @@ func (m *ProtoGapWithBlocks) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Series = append(m.Series, &ProtoSeries{})
if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field BlockRef", wireType)
}

@ -3,6 +3,7 @@ syntax = "proto3";
package protos;
import "gogoproto/gogo.proto";
import "pkg/logproto/bloomgateway.proto";
option go_package = "protos";
option (gogoproto.marshaler_all) = true;
@ -27,12 +28,18 @@ message DayTable {
string prefix = 2;
}
message ProtoSeries {
uint64 fingerprint = 1;
repeated logproto.ShortRef chunks = 2;
}
message ProtoGapWithBlocks {
ProtoFingerprintBounds bounds = 1 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "bounds"
];
repeated string blockRef = 2;
repeated ProtoSeries series = 2;
repeated string blockRef = 3;
}
// TODO: Define BlockRef and SingleTenantTSDBIdentifier as messages so we can use them right away

Loading…
Cancel
Save