Bloom gateway: Add metrics for store operations and chunk ref counts (#11677)

For better observability of the bloom gateway, this PR adds two
additional metrics that expose the amount of chunk refs pre and post
filtering. This can be used to calculate the filter ratio of the
gateways.

The PR also adds a metric that observes the latency of the actual
processing time of bloom filters within the worker.

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11703/head
Christian Haudum 2 years ago committed by GitHub
parent 96839fadd1
commit bdcb69540f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 62
      integration/loki_micro_services_test.go
  2. 18
      integration/parse_metrics.go
  3. 54
      pkg/bloomgateway/bloomgateway.go
  4. 29
      pkg/bloomgateway/worker.go

@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
"github.com/grafana/loki/integration/client"
@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) {
func TestBloomFiltersEndToEnd(t *testing.T) {
commonFlags := []string{
"-bloom-compactor.compaction-interval=2s",
"-bloom-compactor.compaction-interval=10s",
"-bloom-compactor.enable-compaction=true",
"-bloom-compactor.enabled=true",
"-bloom-gateway.enable-filtering=true",
@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-target=index-gateway",
)...,
)
_ = clu.AddComponent(
tBloomGateway = clu.AddComponent(
"bloom-gateway",
append(
commonFlags,
@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
tBloomCompactor = clu.AddComponent(
"bloom-compactor",
append(
commonFlags,
@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now
cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL())
cliBloomGateway.Now = now
cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL())
cliBloomCompactor.Now = now
lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// each line contains a random, unique string
@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
require.NoError(t, tIngester.Restart())
// wait for compactor to compact index and for bloom compactor to build bloom filters
time.Sleep(10 * time.Second)
require.Eventually(t, func() bool {
// verify metrics that observe usage of block for filtering
metrics, err := cliBloomCompactor.Metrics()
require.NoError(t, err)
successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics)
t.Log("successful bloom compactor runs", successfulRunCount)
return successfulRunCount == 1
}, 30*time.Second, time.Second)
// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])
// TODO(chaudum):
// verify that bloom blocks have actually been used for querying
// atm, we can only verify by logs, so we should add appropriate metrics for
// uploaded/downloaded blocks and metas
// verify metrics that observe usage of block for filtering
bloomGwMetrics, err := cliBloomGateway.Metrics()
require.NoError(t, err)
unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics)
require.Equal(t, float64(10), unfilteredCount)
filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics)
require.Equal(t, float64(1), filteredCount)
mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
require.NoError(t, err)
count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
Name: proto.String("status"),
Value: proto.String("success"),
}, func(m *dto.Metric) uint64 {
return m.Histogram.GetSampleCount()
})
require.Equal(t, uint64(1), count)
}
func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() })
}
func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R {
eq := func(e *dto.LabelPair) bool {
return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue()
}
var zero R
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
if !slices.ContainsFunc(m.GetLabel(), eq) {
continue
}
return m.Counter.GetValue()
return f(m)
}
return 0
return zero
}
func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) {

@ -13,16 +13,24 @@ var (
ErrInvalidMetricType = fmt.Errorf("invalid metric type")
)
func extractMetric(metricName, metrics string) (float64, map[string]string, error) {
func extractMetricFamily(name, metrics string) (*io_prometheus_client.MetricFamily, error) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics))
if err != nil {
return 0, nil, err
return nil, err
}
mf, ok := mfs[name]
if !ok {
return nil, ErrNoMetricFound
}
return mf, nil
}
mf, found := mfs[metricName]
if !found {
return 0, nil, ErrNoMetricFound
func extractMetric(metricName, metrics string) (float64, map[string]string, error) {
mf, err := extractMetricFamily(metricName, metrics)
if err != nil {
return 0, nil, err
}
var val float64

@ -80,8 +80,10 @@ var (
)
type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
}
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
@ -102,9 +104,29 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
}
}
func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}
func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}
// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
@ -284,8 +306,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}
numChunksUnfiltered := len(req.Refs)
// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
@ -313,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
responses := responsesPool.Get(requestCount)
defer responsesPool.Put(responses)
outer:
for {
select {
case <-ctx.Done():
@ -325,17 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if len(responses) == requestCount {
for _, o := range responses {
if res.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
break outer
}
}
}
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}
func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {

@ -27,6 +27,7 @@ type workerMetrics struct {
dequeueErrors *prometheus.CounterVec
dequeueWaitTime *prometheus.SummaryVec
storeAccessLatency *prometheus.HistogramVec
bloomQueryLatency *prometheus.HistogramVec
}
func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "dequeue_wait_time",
Help: "Time spent waiting for dequeuing tasks from queue",
}, labels),
bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bloom_query_latency",
Help: "Latency in seconds of processing bloom blocks",
}, append(labels, "status")),
// TODO(chaudum): Move this metric into the bloomshipper
storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
@ -213,29 +221,32 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin
return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
processBlock(bq, day, b.tasks)
return nil
return w.processBlock(bq, day, b.tasks)
}
}
return nil
})
}
func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to get block schema")
}
return err
}
tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})
start := time.Now()
err = fq.Run()
duration := time.Since(start).Seconds()
if err != nil {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to run chunk check")
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration)
return err
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration)
return nil
}

Loading…
Cancel
Save