From d4fcef5229d088c8ba61a2665c163c6e7330c682 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 10 Jun 2024 13:52:29 +0200 Subject: [PATCH] chore(blooms): Some boom gateway cleanups (#13165) * Cleanup logic for processing multiple days in single `FilterChunkRefs` request in the bloom gateway. The handler already returned an error if the requested chunk refs spanned across multiple days, but the logic for processing multiple days/tasks was still in place. This commit simplifies the logic to only process a single task per request. * Remove unused package global variable * Remove bloomgateway package comment * Remove duplicate imports * Avoid conversion from *logproto.ShortRef to v1.ChunkRef and istead cast the value of the pointer * Simplify `getFromThrough()` function Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 160 ++++++-------------------- pkg/bloomgateway/bloomgateway_test.go | 117 ++++++------------- pkg/bloomgateway/util.go | 12 +- 3 files changed, 78 insertions(+), 211 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 165e2d6524..5747f6e799 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -1,43 +1,7 @@ /* -Bloom Gateway package - The bloom gateway is a component that can be run as a standalone microserivce target and provides capabilities for filtering ChunkRefs based on a given list of line filter expressions. - - Querier Query Frontend - | | - ................................... service boundary - | | - +----+------+ - | - indexgateway.Gateway - | - bloomgateway.BloomQuerier - | - bloomgateway.GatewayClient - | - logproto.BloomGatewayClient - | - ................................... service boundary - | - bloomgateway.Gateway - | - queue.RequestQueue - | - bloomgateway.Worker - | - bloomgateway.Processor - | - bloomshipper.Store - | - bloomshipper.Client - | - ObjectClient - | - ................................... service boundary - | - object storage */ package bloomgateway @@ -63,13 +27,10 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" - util_log "github.com/grafana/loki/v3/pkg/util/log" utillog "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/spanlogger" ) -var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring") - const ( metricsSubsystem = "bloom_gateway" querierMetricsSubsystem = "bloom_gateway_querier" @@ -209,7 +170,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk stats, ctx := ContextWithEmptyStats(ctx) logger := spanlogger.FromContextWithFallback( ctx, - util_log.WithContext(ctx, g.logger), + utillog.WithContext(ctx, g.logger), ) defer func() { @@ -261,9 +222,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk }, nil } - // TODO(chaudum): I intentionally keep the logic for handling multiple tasks, - // so that the PR does not explode in size. This should be cleaned up at some point. - seriesByDay := partitionRequest(req) stats.NumTasks = len(seriesByDay) @@ -279,14 +237,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.New("request time range must span exactly one day") } - tasks := make([]Task, 0, len(seriesByDay)) - responses := make([][]v1.Output, 0, len(seriesByDay)) - for _, seriesForDay := range seriesByDay { - task := newTask(ctx, tenantID, seriesForDay, filters, blocks) - // TODO(owen-d): include capacity in constructor? - task.responses = responsesPool.Get(len(seriesForDay.series)) - tasks = append(tasks, task) - } + series := seriesByDay[0] + task := newTask(ctx, tenantID, series, filters, blocks) + + // TODO(owen-d): include capacity in constructor? + task.responses = responsesPool.Get(len(series.series)) + // free up the responses + defer responsesPool.Put(task.responses) g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) @@ -297,62 +254,41 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk preFilterChunks += len(series.Refs) } - // Ideally we could use an unbuffered channel here, but since we return the - // request on the first error, there can be cases where the request context - // is not done yet and the consumeTask() function wants to send to the - // tasksCh, but nobody reads from it any more. - queueStart := time.Now() - tasksCh := make(chan Task, len(tasks)) - for _, task := range tasks { - task := task - task.enqueueTime = time.Now() - - // TODO(owen-d): gracefully handle full queues - if err := g.queue.Enqueue(tenantID, nil, task, func() { - // When enqueuing, we also add the task to the pending tasks - _ = g.pendingTasks.Inc() - }); err != nil { - stats.Status = labelFailure - return nil, errors.Wrap(err, "failed to enqueue task") - } - // TODO(owen-d): use `concurrency` lib, bound parallelism - go g.consumeTask(ctx, task, tasksCh) - } - - sp.LogKV("msg", "enqueued tasks", "duration", time.Since(queueStart).String()) + tasksCh := make(chan Task, 1) - remaining := len(tasks) + // TODO(owen-d): gracefully handle full queues + task.enqueueTime = time.Now() + if err := g.queue.Enqueue(tenantID, nil, task, func() { + // When enqueuing, we also add the task to the pending tasks + _ = g.pendingTasks.Inc() + }); err != nil { + stats.Status = labelFailure + return nil, errors.Wrap(err, "failed to enqueue task") + } + // TODO(owen-d): use `concurrency` lib, bound parallelism + go g.consumeTask(ctx, task, tasksCh) combinedRecorder := v1.NewBloomRecorder(ctx, "combined") - for remaining > 0 { - select { - case <-ctx.Done(): - stats.Status = "cancel" - return nil, errors.Wrap(ctx.Err(), "request failed") - case task := <-tasksCh: - if task.Err() != nil { - stats.Status = labelFailure - return nil, errors.Wrap(task.Err(), "request failed") - } - responses = append(responses, task.responses) - combinedRecorder.Merge(task.recorder) - remaining-- + + select { + case <-ctx.Done(): + stats.Status = "cancel" + return nil, errors.Wrap(ctx.Err(), "request failed") + case task = <-tasksCh: + if task.Err() != nil { + stats.Status = labelFailure + return nil, errors.Wrap(task.Err(), "request failed") } + combinedRecorder.Merge(task.recorder) } - combinedRecorder.Report(util_log.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics()) - sp.LogKV("msg", "received all responses") + combinedRecorder.Report(utillog.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics()) start := time.Now() - filtered := filterChunkRefs(req, responses) + filtered := filterChunkRefs(req, task.responses) duration := time.Since(start) stats.AddPostProcessingTime(duration) - // free up the responses - for _, resp := range responses { - responsesPool.Put(resp) - } - var postFilterSeries, postFilterChunks int postFilterSeries = len(filtered) for _, group := range filtered { @@ -404,35 +340,13 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas } } -// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses. -// Individual responses do not need to be be ordered beforehand. -func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { - if len(responses) == 0 { - return v1.NewEmptyIter[v1.Output]() - } - if len(responses) == 1 { - sort.Slice(responses[0], func(i, j int) bool { return responses[0][i].Fp < responses[0][j].Fp }) - return v1.NewSliceIter(responses[0]) - } - - itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses)) - for _, r := range responses { - sort.Slice(r, func(i, j int) bool { return r[i].Fp < r[j].Fp }) - itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r))) - } - return v1.NewHeapIterator[v1.Output]( - func(o1, o2 v1.Output) bool { return o1.Fp < o2.Fp }, - itrs..., - ) -} - // TODO(owen-d): improve perf. This can be faster with a more specialized impl // NB(owen-d): `req` is mutated in place for performance, but `responses` is not // Removals of the outputs must be sorted. -func filterChunkRefs( - req *logproto.FilterChunkRefRequest, - responses [][]v1.Output, -) []*logproto.GroupedChunkRefs { +func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses []v1.Output) []*logproto.GroupedChunkRefs { + // sort responses by fingerprint + sort.Slice(responses, func(i, j int) bool { return responses[i].Fp < responses[j].Fp }) + res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) // dedupe outputs, merging the same series. @@ -481,7 +395,7 @@ func filterChunkRefs( res.Removals = chks return res }, - v1.NewPeekingIter(orderedResponsesByFP(responses)), + v1.NewPeekingIter(v1.NewSliceIter(responses)), ) // Iterate through the requested and filtered series/chunks, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index fdcd7df117..d16e833fc4 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -517,18 +517,15 @@ func TestFilterChunkRefs(t *testing.T) { fp uint64 checksums []uint32 } - mkRemovals := func(xs [][]instruction) [][]v1.Output { - out := make([][]v1.Output, len(xs)) + mkRemovals := func(xs []instruction) []v1.Output { + out := make([]v1.Output, len(xs)) for i, x := range xs { - out[i] = make([]v1.Output, len(x)) - for j, c := range x { - out[i][j] = v1.Output{ - Fp: model.Fingerprint(c.fp), - Removals: make(v1.ChunkRefs, len(c.checksums)), - } - for k, chk := range c.checksums { - out[i][j].Removals[k] = v1.ChunkRef{Checksum: chk} - } + out[i] = v1.Output{ + Fp: model.Fingerprint(x.fp), + Removals: make(v1.ChunkRefs, len(x.checksums)), + } + for k, chk := range x.checksums { + out[i].Removals[k] = v1.ChunkRef{Checksum: chk} } } return out @@ -551,7 +548,7 @@ func TestFilterChunkRefs(t *testing.T) { for _, tc := range []struct { desc string input *logproto.FilterChunkRefRequest - removals [][]instruction + removals []instruction expected *logproto.FilterChunkRefRequest }{ { @@ -562,22 +559,18 @@ func TestFilterChunkRefs(t *testing.T) { { desc: "remove all", input: mkInput(2, 2), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{0, 1}}, - {fp: 1, checksums: []uint32{0, 1}}, - }, + removals: []instruction{ + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 1, checksums: []uint32{0, 1}}, }, expected: mkInput(0, 0), }, { desc: "remove every other series", input: mkInput(4, 2), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{0, 1}}, - {fp: 2, checksums: []uint32{0, 1}}, - }, + removals: []instruction{ + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 2, checksums: []uint32{0, 1}}, }, expected: mkResult([]instruction{ {fp: 1, checksums: []uint32{0, 1}}, @@ -587,13 +580,11 @@ func TestFilterChunkRefs(t *testing.T) { { desc: "remove the last chunk for each series", input: mkInput(4, 2), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{1}}, - {fp: 1, checksums: []uint32{1}}, - {fp: 2, checksums: []uint32{1}}, - {fp: 3, checksums: []uint32{1}}, - }, + removals: []instruction{ + {fp: 0, checksums: []uint32{1}}, + {fp: 1, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, + {fp: 3, checksums: []uint32{1}}, }, expected: mkResult([]instruction{ {fp: 0, checksums: []uint32{0}}, @@ -605,11 +596,9 @@ func TestFilterChunkRefs(t *testing.T) { { desc: "remove the middle chunk for every other series", input: mkInput(4, 3), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{1}}, - {fp: 2, checksums: []uint32{1}}, - }, + removals: []instruction{ + {fp: 0, checksums: []uint32{1}}, + {fp: 2, checksums: []uint32{1}}, }, expected: mkResult([]instruction{ {fp: 0, checksums: []uint32{0, 2}}, @@ -621,10 +610,8 @@ func TestFilterChunkRefs(t *testing.T) { { desc: "remove the first chunk of the last series", input: mkInput(4, 3), - removals: [][]instruction{ - { - {fp: 3, checksums: []uint32{0}}, - }, + removals: []instruction{ + {fp: 3, checksums: []uint32{0}}, }, expected: mkResult([]instruction{ {fp: 0, checksums: []uint32{0, 1, 2}}, @@ -636,13 +623,11 @@ func TestFilterChunkRefs(t *testing.T) { { desc: "duplicate removals", input: mkInput(4, 3), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{0, 1}}, - {fp: 0, checksums: []uint32{0, 1, 2}}, - {fp: 1, checksums: []uint32{0, 2}}, - {fp: 2, checksums: []uint32{1}}, - }, + removals: []instruction{ + {fp: 0, checksums: []uint32{0, 1}}, + {fp: 0, checksums: []uint32{0, 1, 2}}, + {fp: 1, checksums: []uint32{0, 2}}, + {fp: 2, checksums: []uint32{1}}, }, expected: mkResult([]instruction{ {fp: 1, checksums: []uint32{1}}, @@ -650,45 +635,19 @@ func TestFilterChunkRefs(t *testing.T) { {fp: 3, checksums: []uint32{0, 1, 2}}, }), }, - { - desc: "middle duplicates across 2 days", - input: mkInput(4, 3), - removals: [][]instruction{ - { - {fp: 0, checksums: []uint32{1}}, - {fp: 2, checksums: []uint32{1}}, - }, - { - {fp: 0, checksums: []uint32{1}}, - {fp: 2, checksums: []uint32{1}}, - }, - }, - expected: mkResult([]instruction{ - {fp: 0, checksums: []uint32{0, 2}}, - {fp: 1, checksums: []uint32{0, 1, 2}}, - {fp: 2, checksums: []uint32{0, 2}}, - {fp: 3, checksums: []uint32{0, 1, 2}}, - }), - }, { desc: "unordered fingerprints", input: mkInput(4, 3), - removals: [][]instruction{ - { - {fp: 3, checksums: []uint32{2}}, - {fp: 0, checksums: []uint32{1, 2}}, - {fp: 2, checksums: []uint32{1, 2}}, - }, - { - {fp: 1, checksums: []uint32{1}}, - {fp: 2, checksums: []uint32{0, 1}}, - {fp: 3, checksums: []uint32{0}}, - }, + removals: []instruction{ + {fp: 3, checksums: []uint32{2}}, + {fp: 0, checksums: []uint32{1, 2}}, + {fp: 2, checksums: []uint32{1, 2}}, }, expected: mkResult([]instruction{ {fp: 0, checksums: []uint32{0}}, - {fp: 1, checksums: []uint32{0, 2}}, - {fp: 3, checksums: []uint32{1}}, + {fp: 1, checksums: []uint32{0, 1, 2}}, + {fp: 2, checksums: []uint32{0}}, + {fp: 3, checksums: []uint32{0, 1}}, }), }, } { @@ -752,7 +711,7 @@ func BenchmarkFilterChunkRefs(b *testing.B) { { desc: "filterChunkRefs", f: func(req *logproto.FilterChunkRefRequest, responses []v1.Output) { - filterChunkRefs(req, [][]v1.Output{responses}) + filterChunkRefs(req, responses) }, }, } { diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index df3a93fcaf..5f115ba75c 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -28,23 +28,17 @@ func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) { } maxItem := slices.MaxFunc(refs, func(a, b *logproto.ShortRef) int { - if a.Through > b.Through { - return 1 - } else if a.Through < b.Through { - return -1 - } - return 0 + return int(a.Through) - int(b.Through) }) return refs[0].From, maxItem.Through } // convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs -// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request. func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs { result := make(v1.ChunkRefs, 0, len(refs)) - for _, ref := range refs { - result = append(result, v1.ChunkRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}) + for i := range refs { + result = append(result, v1.ChunkRef(*refs[i])) } return result }