diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index bb58f79288..f9acd70a2c 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -299,6 +299,18 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } + // start time == end time --> empty response + if req.From.Equal(req.Through) { + return &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + }, nil + } + + // start time > end time --> error response + if req.Through.Before(req.From) { + return nil, errors.New("from time must not be after through time") + } + numChunksUnfiltered := len(req.Refs) // Shortcut if request does not contain filters @@ -315,21 +327,64 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint }) - task, resCh, errCh, err := NewTask(tenantID, req) + var expectedResponses int + seriesWithBloomsPerDay := partitionRequest(req) - if err != nil { - return nil, err + // no tasks --> empty response + if len(seriesWithBloomsPerDay) == 0 { + return &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + }, nil + } + + tasks := make([]Task, 0, len(seriesWithBloomsPerDay)) + for _, seriesWithBounds := range seriesWithBloomsPerDay { + task, err := NewTask(tenantID, seriesWithBounds, req.Filters) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + expectedResponses += len(seriesWithBounds.series) } g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) - level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID) - g.queue.Enqueue(tenantID, []string{}, task, func() { - // When enqueuing, we also add the task to the pending tasks - g.pendingTasks.Add(task.ID, task) - }) - requestCount := len(req.Refs) - responses := responsesPool.Get(requestCount) + errCh := make(chan error, 1) + resCh := make(chan v1.Output, 1) + + for _, task := range tasks { + level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series)) + g.queue.Enqueue(tenantID, []string{}, task, func() { + // When enqueuing, we also add the task to the pending tasks + g.pendingTasks.Add(task.ID, task) + }) + + // Forward responses or error to the main channels + // TODO(chaudum): Refactor to make tasks cancelable + go func(t Task) { + for { + select { + case <-ctx.Done(): + return + case err := <-t.ErrCh: + if ctx.Err() != nil { + level.Warn(g.logger).Log("msg", "received err from channel, but context is already done", "err", ctx.Err()) + return + } + errCh <- err + case res := <-t.ResCh: + level.Debug(g.logger).Log("msg", "got partial result", "task", t.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len()) + if ctx.Err() != nil { + level.Warn(g.logger).Log("msg", "received res from channel, but context is already done", "err", ctx.Err()) + return + } + resCh <- res + } + } + }(task) + } + + responses := responsesPool.Get(expectedResponses) defer responsesPool.Put(responses) outer: @@ -342,9 +397,9 @@ outer: case res := <-resCh: responses = append(responses, res) // log line is helpful for debugging tests - level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) + level.Debug(g.logger).Log("msg", "got partial result", "progress", fmt.Sprintf("%d/%d", len(responses), expectedResponses)) // wait for all parts of the full response - if len(responses) == requestCount { + if len(responses) == expectedResponses { break outer } } @@ -354,7 +409,6 @@ outer: if o.Removals.Len() == 0 { continue } - // we must not remove items from req.Refs as long as the worker may iterater over them removeNotMatchingChunks(req, o, g.logger) } diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 1afdc72c79..120e6da26f 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -1,13 +1,13 @@ package bloomgateway import ( - "sort" "time" "github.com/oklog/ulid" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) @@ -25,138 +25,63 @@ type Task struct { ID ulid.ULID // Tenant is the tenant ID Tenant string - // Request is the original request - Request *logproto.FilterChunkRefRequest + // ErrCh is a send-only channel to write an error to - ErrCh chan<- error + ErrCh chan error // ResCh is a send-only channel to write partial responses to - ResCh chan<- v1.Output + ResCh chan v1.Output + + // series of the original request + series []*logproto.GroupedChunkRefs + // filters of the original request + filters []syntax.LineFilter + // from..through date of the task's chunks + bounds model.Interval + + // TODO(chaudum): Investigate how to remove that. + day model.Time } // NewTask returns a new Task that can be enqueued to the task queue. // In addition, it returns a result and an error channel, as well // as an error if the instantiation fails. -func NewTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan v1.Output, chan error, error) { +func NewTask(tenantID string, refs seriesWithBounds, filters []syntax.LineFilter) (Task, error) { key, err := ulid.New(ulid.Now(), nil) if err != nil { - return Task{}, nil, nil, err + return Task{}, err } errCh := make(chan error, 1) - resCh := make(chan v1.Output, 1) + resCh := make(chan v1.Output, len(refs.series)) + task := Task{ ID: key, Tenant: tenantID, - Request: req, ErrCh: errCh, ResCh: resCh, + filters: filters, + series: refs.series, + bounds: refs.bounds, + day: refs.day, } - return task, resCh, errCh, nil -} - -// Copy returns a copy of the existing task but with a new slice of chunks -func (t Task) Copy(refs []*logproto.GroupedChunkRefs) Task { - return Task{ - ID: t.ID, - Tenant: t.Tenant, - Request: &logproto.FilterChunkRefRequest{ - From: t.Request.From, - Through: t.Request.Through, - Filters: t.Request.Filters, - Refs: refs, - }, - ErrCh: t.ErrCh, - ResCh: t.ResCh, - } -} - -// Bounds returns the day boundaries of the task -func (t Task) Bounds() (time.Time, time.Time) { - return getDayTime(t.Request.From), getDayTime(t.Request.Through) -} - -func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] { - cf := filterGroupedChunkRefsByDay{day: day} - return &FilterIter[*logproto.GroupedChunkRefs]{ - iter: v1.NewSliceIter(t.Request.Refs), - matches: cf.contains, - transform: cf.filter, - } -} - -type filterGroupedChunkRefsByDay struct { - day time.Time -} - -func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool { - from, through := getFromThrough(a.Refs) - if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) { - return false - } - return true -} - -func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { - minTs, maxTs := getFromThrough(a.Refs) - - // in most cases, all chunks are within day range - if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) { - return a - } - - // case where certain chunks are outside of day range - // using binary search to get min and max index of chunks that fall into the day range - min := sort.Search(len(a.Refs), func(i int) bool { - start := a.Refs[i].From.Time() - end := a.Refs[i].Through.Time() - return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0 - }) - - max := sort.Search(len(a.Refs), func(i int) bool { - start := a.Refs[i].From.Time() - return start.Compare(cf.day.Add(Day)) > 0 - }) - - return &logproto.GroupedChunkRefs{ - Tenant: a.Tenant, - Fingerprint: a.Fingerprint, - Refs: a.Refs[min:max], - } + return task, nil } -type Predicate[T any] func(a T) bool -type Transform[T any] func(a T) T - -type FilterIter[T any] struct { - iter v1.Iterator[T] - matches Predicate[T] - transform Transform[T] - cache T - zero T // zero value of the return type of Next() +func (t Task) Bounds() (model.Time, model.Time) { + return t.bounds.Start, t.bounds.End } -func (it *FilterIter[T]) Next() bool { - next := it.iter.Next() - if !next { - it.cache = it.zero - return false - } - for next && !it.matches(it.iter.At()) { - next = it.iter.Next() - if !next { - it.cache = it.zero - return false - } +// Copy returns a copy of the existing task but with a new slice of grouped chunk refs +func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { + return Task{ + ID: ulid.ULID{}, // create emty ID to distinguish it as copied task + Tenant: t.Tenant, + ErrCh: t.ErrCh, + ResCh: t.ResCh, + filters: t.filters, + series: series, + bounds: t.bounds, + day: t.day, } - it.cache = it.transform(it.iter.At()) - return true -} - -func (it *FilterIter[T]) At() T { - return it.cache -} - -func (it *FilterIter[T]) Err() error { - return nil } // taskMergeIterator implements v1.Iterator @@ -164,12 +89,12 @@ type taskMergeIterator struct { curr v1.Request heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]] tasks []Task - day time.Time + day model.Time tokenizer *v1.NGramTokenizer err error } -func newTaskMergeIterator(day time.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] { +func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] { it := &taskMergeIterator{ tasks: tasks, curr: v1.Request{}, @@ -183,8 +108,8 @@ func newTaskMergeIterator(day time.Time, tokenizer *v1.NGramTokenizer, tasks ... func (it *taskMergeIterator) init() { sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) for i := range it.tasks { - iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i) - sequences = append(sequences, v1.NewPeekingIter(iter)) + iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i) + sequences = append(sequences, iter) } it.heap = v1.NewHeapIterator( func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool { @@ -207,7 +132,7 @@ func (it *taskMergeIterator) Next() bool { it.curr = v1.Request{ Fp: model.Fingerprint(group.Value().Fingerprint), Chks: convertToChunkRefs(group.Value().Refs), - Searches: convertToSearches(task.Request.Filters, it.tokenizer), + Searches: convertToSearches(task.filters, it.tokenizer), Response: task.ResCh, } return true diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index d414256f95..67277d60f2 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -5,62 +5,59 @@ import ( "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql/syntax" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) func TestTask(t *testing.T) { - t.Run("bounds returns request boundaries", func(t *testing.T) { - ts := model.Now() + ts := mktime("2024-01-24 12:00") + t.Run("bounds returns boundaries of chunks", func(t *testing.T) { req := &logproto.FilterChunkRefRequest{ - From: ts.Add(-1 * time.Hour), + From: ts.Add(-24 * time.Hour), Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-1 * time.Hour), Through: ts}, + }, + }, + }, } - task, _, _, err := NewTask("tenant", req) + swb := partitionRequest(req)[0] + task, err := NewTask("tenant", swb, nil) require.NoError(t, err) from, through := task.Bounds() - require.Equal(t, getDayTime(req.From), from) - require.Equal(t, getDayTime(req.Through), through) + require.Equal(t, ts.Add(-1*time.Hour), from) + require.Equal(t, ts, through) + require.Equal(t, truncateDay(ts), task.day) }) } +func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.FilterChunkRefRequest) []Task { + t.Helper() + + tasks := make([]Task, 0, len(requests)) + for _, r := range requests { + for _, swb := range partitionRequest(r) { + task, err := NewTask(tenant, swb, nil) + require.NoError(t, err) + tasks = append(tasks, task) + } + } + return tasks +} + func TestTaskMergeIterator(t *testing.T) { - // Thu Nov 09 2023 10:56:50 UTC - ts := model.TimeFromUnix(1699523810) - day := getDayTime(ts) + ts := mktime("2024-01-24 12:00") + day := truncateDay(ts) tenant := "fake" tokenizer := v1.NewNGramTokenizer(4, 0) t.Run("empty requests result in empty iterator", func(t *testing.T) { - r1 := &logproto.FilterChunkRefRequest{ - From: ts.Add(-3 * time.Hour), - Through: ts.Add(-2 * time.Hour), - Refs: []*logproto.GroupedChunkRefs{}, - } - t1, _, _, err := NewTask(tenant, r1) - require.NoError(t, err) - - r2 := &logproto.FilterChunkRefRequest{ - From: ts.Add(-1 * time.Hour), - Through: ts, - Refs: []*logproto.GroupedChunkRefs{}, - } - t2, _, _, err := NewTask(tenant, r2) - require.NoError(t, err) - - r3 := &logproto.FilterChunkRefRequest{ - From: ts.Add(-1 * time.Hour), - Through: ts, - Refs: []*logproto.GroupedChunkRefs{}, - } - t3, _, _, err := NewTask(tenant, r3) - require.NoError(t, err) - - it := newTaskMergeIterator(day, tokenizer, t1, t2, t3) + it := newTaskMergeIterator(day, tokenizer) // nothing to iterate over require.False(t, it.Next()) }) @@ -75,8 +72,6 @@ func TestTaskMergeIterator(t *testing.T) { }}, }, } - t1, _, _, err := NewTask(tenant, r1) - require.NoError(t, err) r2 := &logproto.FilterChunkRefRequest{ From: ts.Add(-1 * time.Hour), @@ -90,8 +85,6 @@ func TestTaskMergeIterator(t *testing.T) { }}, }, } - t2, _, _, err := NewTask(tenant, r2) - require.NoError(t, err) r3 := &logproto.FilterChunkRefRequest{ From: ts.Add(-1 * time.Hour), @@ -102,10 +95,9 @@ func TestTaskMergeIterator(t *testing.T) { }}, }, } - t3, _, _, err := NewTask(tenant, r3) - require.NoError(t, err) - it := newTaskMergeIterator(day, tokenizer, t1, t2, t3) + tasks := createTasksForRequests(t, tenant, r1, r2, r3) + it := newTaskMergeIterator(day, tokenizer, tasks...) // first item require.True(t, it.Next()) @@ -135,73 +127,3 @@ func TestTaskMergeIterator(t *testing.T) { require.False(t, it.Next()) }) } - -func TestChunkIterForDay(t *testing.T) { - tenant := "fake" - - // Thu Nov 09 2023 10:56:50 UTC - ts := model.TimeFromUnix(1699523810) - - t.Run("filter chunk refs that fall into the day range", func(t *testing.T) { - input := &logproto.FilterChunkRefRequest{ - From: ts.Add(-168 * time.Hour), // 1w ago - Through: ts, - Refs: []*logproto.GroupedChunkRefs{ - {Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100}, - {From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101}, - }}, - {Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200}, - {From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201}, - }}, - {Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300}, - {From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301}, - }}, - {Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400}, - {From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401}, - }}, - {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500}, - {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, - }}, - {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, - {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601}, - }}, - {Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700}, - {From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701}, - }}, - }, - Filters: []syntax.LineFilter{ - {Ty: labels.MatchEqual, Match: "foo"}, - {Ty: labels.MatchEqual, Match: "bar"}, - }, - } - - // day ranges from ts-48h to ts-24h - day := getDayTime(ts.Add(-36 * time.Hour)) - - expected := []*logproto.GroupedChunkRefs{ - {Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501}, - }}, - {Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{ - {From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600}, - }}, - } - - task, _, _, _ := NewTask(tenant, input) - it := task.ChunkIterForDay(day) - - output := make([]*logproto.GroupedChunkRefs, 0, len(input.Refs)) - for it.Next() { - output = append(output, it.At()) - } - - require.Equal(t, expected, output) - }) -} diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 89d238864a..cf72aec3b5 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -15,7 +15,12 @@ import ( ) func getDayTime(ts model.Time) time.Time { - return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC) + return ts.Time().UTC().Truncate(Day) +} + +func truncateDay(ts model.Time) model.Time { + // model.minimumTick is time.Millisecond + return ts - (ts % model.Time(24*time.Hour/time.Millisecond)) } // getFromThrough assumes a list of ShortRefs sorted by From time @@ -24,6 +29,10 @@ func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) { return model.Earliest, model.Latest } + if len(refs) == 1 { + return refs[0].From, refs[0].Through + } + maxItem := slices.MaxFunc(refs, func(a, b *logproto.ShortRef) int { if a.Through > b.Through { return 1 @@ -85,7 +94,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re } for _, task := range tasks { - refs := task.Request.Refs + refs := task.series min := sort.Search(len(refs), func(i int) bool { return block.Cmp(refs[i].Fingerprint) > v1.Before }) @@ -109,3 +118,66 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re } return result } + +type seriesWithBounds struct { + bounds model.Interval + day model.Time + series []*logproto.GroupedChunkRefs +} + +func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds { + result := make([]seriesWithBounds, 0) + + fromDay, throughDay := truncateDay(req.From), truncateDay(req.Through) + + for day := fromDay; day.Equal(throughDay) || day.Before(throughDay); day = day.Add(Day) { + minTs, maxTs := model.Latest, model.Earliest + nextDay := day.Add(Day) + res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) + + for _, series := range req.Refs { + chunks := series.Refs + + min := sort.Search(len(chunks), func(i int) bool { + return chunks[i].Through >= day + }) + + max := sort.Search(len(chunks), func(i int) bool { + return chunks[i].From >= nextDay + }) + + // All chunks fall outside of the range + if min == len(chunks) || max == 0 { + continue + } + + if chunks[min].From < minTs { + minTs = chunks[min].From + } + if chunks[max-1].Through > maxTs { + maxTs = chunks[max-1].Through + } + // fmt.Println("day", day, "series", series.Fingerprint, "minTs", minTs, "maxTs", maxTs) + + res = append(res, &logproto.GroupedChunkRefs{ + Fingerprint: series.Fingerprint, + Tenant: series.Tenant, + Refs: chunks[min:max], + }) + + } + + if len(res) > 0 { + result = append(result, seriesWithBounds{ + bounds: model.Interval{ + Start: minTs, + End: maxTs, + }, + day: day, + series: res, + }) + } + } + + return result +} diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 152309ddbd..81f0720b9b 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -2,6 +2,7 @@ package bloomgateway import ( "testing" + "time" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -27,6 +28,23 @@ func TestGetFromThrough(t *testing.T) { require.Equal(t, model.Time(4), chunks[len(chunks)-1].From) } +func TestTruncateDay(t *testing.T) { + expected := mktime("2024-01-24 00:00") + + for _, inp := range []string{ + "2024-01-24 00:00", + "2024-01-24 08:00", + "2024-01-24 16:00", + "2024-01-24 23:59", + } { + t.Run(inp, func(t *testing.T) { + ts := mktime(inp) + result := truncateDay(ts) + require.Equal(t, expected, result) + }) + } +} + func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef { return bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ @@ -37,6 +55,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef { } func TestPartitionFingerprintRange(t *testing.T) { + t.Run("consecutive block ranges", func(t *testing.T) { bounds := []bloomshipper.BlockRef{ mkBlockRef(0, 99), // out of bounds block @@ -52,10 +71,7 @@ func TestPartitionFingerprintRange(t *testing.T) { tasks := make([]Task, nTasks) for i := startFp; i < startFp+nSeries; i++ { - if tasks[i%nTasks].Request == nil { - tasks[i%nTasks].Request = &logproto.FilterChunkRefRequest{} - } - tasks[i%nTasks].Request.Refs = append(tasks[i%nTasks].Request.Refs, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) + tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) } results := partitionFingerprintRange(tasks, bounds) @@ -67,8 +83,8 @@ func TestPartitionFingerprintRange(t *testing.T) { // ensure we have the right number of tasks per bound require.Len(t, res.tasks, 5) for _, task := range res.tasks { - require.Equal(t, expectedTaskRefs[i], len(task.Request.Refs)) - actualFingerprints = append(actualFingerprints, task.Request.Refs...) + require.Equal(t, expectedTaskRefs[i], len(task.series)) + actualFingerprints = append(actualFingerprints, task.series...) } } @@ -88,9 +104,9 @@ func TestPartitionFingerprintRange(t *testing.T) { mkBlockRef(200, 289), } - task := Task{Request: &logproto.FilterChunkRefRequest{}} + task := Task{} for i := 0; i < 300; i++ { - task.Request.Refs = append(task.Request.Refs, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) + task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) } results := partitionFingerprintRange([]Task{task}, bounds) @@ -98,7 +114,162 @@ func TestPartitionFingerprintRange(t *testing.T) { for _, res := range results { // ensure we have the right number of tasks per bound require.Len(t, res.tasks, 1) - require.Len(t, res.tasks[0].Request.Refs, 90) + require.Len(t, res.tasks[0].series, 90) } }) } + +func TestPartitionRequest(t *testing.T) { + ts := mktime("2024-01-24 12:00") + + testCases := map[string]struct { + inp *logproto.FilterChunkRefRequest + exp []seriesWithBounds + }{ + + "empty": { + inp: &logproto.FilterChunkRefRequest{ + From: ts.Add(-24 * time.Hour), + Through: ts, + }, + exp: []seriesWithBounds{}, + }, + + "all chunks within single day": { + inp: &logproto.FilterChunkRefRequest{ + From: ts.Add(-1 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-60 * time.Minute), Through: ts.Add(-50 * time.Minute)}, + }, + }, + { + Fingerprint: 0x01, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-55 * time.Minute), Through: ts.Add(-45 * time.Minute)}, + }, + }, + }, + }, + exp: []seriesWithBounds{ + { + bounds: model.Interval{Start: ts.Add(-60 * time.Minute), End: ts.Add(-45 * time.Minute)}, + day: mktime("2024-01-24 00:00"), + series: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-60 * time.Minute), Through: ts.Add(-50 * time.Minute)}, + }, + }, + { + Fingerprint: 0x01, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-55 * time.Minute), Through: ts.Add(-45 * time.Minute)}, + }, + }, + }, + }, + }, + }, + + "chunks across multiple days - no overlap": { + inp: &logproto.FilterChunkRefRequest{ + From: ts.Add(-24 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour)}, + }, + }, + { + Fingerprint: 0x01, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-2 * time.Hour), Through: ts.Add(-1 * time.Hour)}, + }, + }, + }, + }, + exp: []seriesWithBounds{ + { + bounds: model.Interval{Start: ts.Add(-23 * time.Hour), End: ts.Add(-22 * time.Hour)}, + day: mktime("2024-01-23 00:00"), + series: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour)}, + }, + }, + }, + }, + { + bounds: model.Interval{Start: ts.Add(-2 * time.Hour), End: ts.Add(-1 * time.Hour)}, + day: mktime("2024-01-24 00:00"), + series: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x01, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-2 * time.Hour), Through: ts.Add(-1 * time.Hour)}, + }, + }, + }, + }, + }, + }, + + "chunks across multiple days - overlap": { + inp: &logproto.FilterChunkRefRequest{ + From: ts.Add(-24 * time.Hour), + Through: ts, + Refs: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)}, + }, + }, + }, + }, + exp: []seriesWithBounds{ + { + bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)}, + day: mktime("2024-01-23 00:00"), + series: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)}, + }, + }, + }, + }, + { + bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)}, + day: mktime("2024-01-24 00:00"), + series: []*logproto.GroupedChunkRefs{ + { + Fingerprint: 0x00, + Refs: []*logproto.ShortRef{ + {From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)}, + }, + }, + }, + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + result := partitionRequest(tc.inp) + require.Equal(t, tc.exp, result) + }) + } + +} diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 73100025a7..e2146ed9fa 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -79,18 +79,18 @@ type worker struct { cfg workerConfig queue *queue.RequestQueue shipper bloomshipper.Interface - tasks *pendingTasks + pending *pendingTasks logger log.Logger metrics *workerMetrics } -func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { +func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, pending *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { w := &worker{ id: id, cfg: cfg, queue: queue, shipper: shipper, - tasks: tasks, + pending: pending, logger: log.With(logger, "worker", id), metrics: metrics, } @@ -134,7 +134,7 @@ func (w *worker) running(ctx context.Context) error { } w.metrics.dequeuedTasks.WithLabelValues(w.id).Add(float64(len(items))) - tasksPerDay := make(map[time.Time][]Task) + tasksPerDay := make(map[model.Time][]Task) for _, item := range items { task, ok := item.(Task) @@ -144,17 +144,9 @@ func (w *worker) running(ctx context.Context) error { return errors.Errorf("failed to cast dequeued item to Task: %v", item) } level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) - w.tasks.Delete(task.ID) + w.pending.Delete(task.ID) - fromDay, throughDay := task.Bounds() - - if fromDay.Equal(throughDay) { - tasksPerDay[fromDay] = append(tasksPerDay[fromDay], task) - } else { - for i := fromDay; i.Before(throughDay); i = i.Add(24 * time.Hour) { - tasksPerDay[i] = append(tasksPerDay[i], task) - } - } + tasksPerDay[task.day] = append(tasksPerDay[task.day], task) } for day, tasks := range tasksPerDay { @@ -162,7 +154,7 @@ func (w *worker) running(ctx context.Context) error { level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks)) storeFetchStart := time.Now() - blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond))) + blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, day, day.Add(Day).Add(-1*time.Nanosecond)) w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) if err != nil { for _, t := range tasks { @@ -177,7 +169,7 @@ func (w *worker) running(ctx context.Context) error { if len(blockRefs) == 0 { level.Warn(logger).Log("msg", "no blocks found") for _, t := range tasks { - for _, ref := range t.Request.Refs { + for _, ref := range t.series { t.ResCh <- v1.Output{ Fp: model.Fingerprint(ref.Fingerprint), Removals: nil, @@ -188,13 +180,13 @@ func (w *worker) running(ctx context.Context) error { continue } - boundedRefs := partitionFingerprintRange(tasks, blockRefs) + tasksForBlocks := partitionFingerprintRange(tasks, blockRefs) blockRefs = blockRefs[:0] - for _, b := range boundedRefs { + for _, b := range tasksForBlocks { blockRefs = append(blockRefs, b.blockRef) } - err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs) + err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks) if err != nil { for _, t := range tasks { t.ErrCh <- err @@ -217,7 +209,7 @@ func (w *worker) stopping(err error) error { return nil } -func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { +func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { @@ -228,7 +220,7 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin }) } -func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { return err