From bdcb69540f66e59e0bdebe1650a3cd0685db96af Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 17 Jan 2024 08:28:46 +0100 Subject: [PATCH] Bloom gateway: Add metrics for store operations and chunk ref counts (#11677) For better observability of the bloom gateway, this PR adds two additional metrics that expose the amount of chunk refs pre and post filtering. This can be used to calculate the filter ratio of the gateways. The PR also adds a metric that observes the latency of the actual processing time of bloom filters within the worker. --------- Signed-off-by: Christian Haudum --- integration/loki_micro_services_test.go | 62 +++++++++++++++++++------ integration/parse_metrics.go | 18 +++++-- pkg/bloomgateway/bloomgateway.go | 54 +++++++++++++++++---- pkg/bloomgateway/worker.go | 29 ++++++++---- 4 files changed, 126 insertions(+), 37 deletions(-) diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 1f7dc836b5..0c05d13d8e 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "google.golang.org/protobuf/proto" "github.com/grafana/loki/integration/client" @@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) { func TestBloomFiltersEndToEnd(t *testing.T) { commonFlags := []string{ - "-bloom-compactor.compaction-interval=2s", + "-bloom-compactor.compaction-interval=10s", "-bloom-compactor.enable-compaction=true", "-bloom-compactor.enabled=true", "-bloom-gateway.enable-filtering=true", @@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-target=index-gateway", )..., ) - _ = clu.AddComponent( + tBloomGateway = clu.AddComponent( "bloom-gateway", append( commonFlags, @@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), )..., ) - _ = clu.AddComponent( + tBloomCompactor = clu.AddComponent( "bloom-compactor", append( commonFlags, @@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) { cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) cliIndexGateway.Now = now + cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL()) + cliBloomGateway.Now = now + + cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL()) + cliBloomCompactor.Now = now + lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"` // ingest logs from 10 different pods // each line contains a random, unique string @@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) { require.NoError(t, tIngester.Restart()) // wait for compactor to compact index and for bloom compactor to build bloom filters - time.Sleep(10 * time.Second) + require.Eventually(t, func() bool { + // verify metrics that observe usage of block for filtering + metrics, err := cliBloomCompactor.Metrics() + require.NoError(t, err) + successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics) + t.Log("successful bloom compactor runs", successfulRunCount) + return successfulRunCount == 1 + }, 30*time.Second, time.Second) // use bloom gateway to perform needle in the haystack queries randIdx := rand.Intn(len(uniqueStrings)) @@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) { expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx]) require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) - // TODO(chaudum): - // verify that bloom blocks have actually been used for querying - // atm, we can only verify by logs, so we should add appropriate metrics for - // uploaded/downloaded blocks and metas + // verify metrics that observe usage of block for filtering + bloomGwMetrics, err := cliBloomGateway.Metrics() + require.NoError(t, err) + + unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics) + require.Equal(t, float64(10), unfilteredCount) + + filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics) + require.Equal(t, float64(1), filteredCount) + + mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics) + require.NoError(t, err) + + count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ + Name: proto.String("status"), + Value: proto.String("success"), + }, func(m *dto.Metric) uint64 { + return m.Histogram.GetSampleCount() + }) + require.Equal(t, uint64(1), count) } func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { + return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() }) +} + +func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R { + eq := func(e *dto.LabelPair) bool { + return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue() + } + var zero R for _, m := range mf.Metric { - if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { + if !slices.ContainsFunc(m.GetLabel(), eq) { continue } - - return m.Counter.GetValue() + return f(m) } - - return 0 + return zero } func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) { diff --git a/integration/parse_metrics.go b/integration/parse_metrics.go index 46ea424978..9f2bf5fc8f 100644 --- a/integration/parse_metrics.go +++ b/integration/parse_metrics.go @@ -13,16 +13,24 @@ var ( ErrInvalidMetricType = fmt.Errorf("invalid metric type") ) -func extractMetric(metricName, metrics string) (float64, map[string]string, error) { +func extractMetricFamily(name, metrics string) (*io_prometheus_client.MetricFamily, error) { var parser expfmt.TextParser mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) if err != nil { - return 0, nil, err + return nil, err + } + + mf, ok := mfs[name] + if !ok { + return nil, ErrNoMetricFound } + return mf, nil +} - mf, found := mfs[metricName] - if !found { - return 0, nil, ErrNoMetricFound +func extractMetric(metricName, metrics string) (float64, map[string]string, error) { + mf, err := extractMetricFamily(metricName, metrics) + if err != nil { + return 0, nil, err } var val float64 diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b0c3251a08..766c05bab4 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -80,8 +80,10 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunkRefsUnfiltered prometheus.Counter + chunkRefsFiltered prometheus.Counter } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -102,9 +104,29 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), + chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_pre_filtering", + Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", + }), + chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_post_filtering", + Help: "Total amount of chunk refs post filtering.", + }), } } +func (m *metrics) addUnfilteredCount(n int) { + m.chunkRefsUnfiltered.Add(float64(n)) +} + +func (m *metrics) addFilteredCount(n int) { + m.chunkRefsFiltered.Add(float64(n)) +} + // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -284,8 +306,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } + numChunksUnfiltered := len(req.Refs) + // Shortcut if request does not contain filters if len(req.Filters) == 0 { + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -313,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responses := responsesPool.Get(requestCount) defer responsesPool.Put(responses) +outer: for { select { case <-ctx.Done(): @@ -325,17 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response if len(responses) == requestCount { - for _, o := range responses { - if res.Removals.Len() == 0 { - continue - } - // we must not remove items from req.Refs as long as the worker may iterater over them - g.removeNotMatchingChunks(req, o) - } - return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + break outer } } } + + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) + + level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index a8f9c56d50..ce5add3c63 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -27,6 +27,7 @@ type workerMetrics struct { dequeueErrors *prometheus.CounterVec dequeueWaitTime *prometheus.SummaryVec storeAccessLatency *prometheus.HistogramVec + bloomQueryLatency *prometheus.HistogramVec } func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { @@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str Name: "dequeue_wait_time", Help: "Time spent waiting for dequeuing tasks from queue", }, labels), + bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "bloom_query_latency", + Help: "Latency in seconds of processing bloom blocks", + }, append(labels, "status")), + // TODO(chaudum): Move this metric into the bloomshipper storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -213,29 +221,32 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { - processBlock(bq, day, b.tasks) - return nil + return w.processBlock(bq, day, b.tasks) } } return nil }) } -func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to get block schema") - } + return err } tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) it := newTaskMergeIterator(day, tokenizer, tasks...) fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it}) + + start := time.Now() err = fq.Run() + duration := time.Since(start).Seconds() + if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to run chunk check") - } + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration) + return err } + + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration) + return nil }