feat(blooms): Add counter metric for blocks that are not available at query time (#12968)

When filtering chunks on the bloom gateway, bloom block may not be available and they will be downloaded asynchronously in the background.

This new metric `loki_bloom_gateway_blocks_not_available_total` counts the blocks that are not available at query time.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/13024/head^2
Christian Haudum 1 year ago committed by GitHub
parent 1432a3e84a
commit d6374bc2ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 19
      pkg/bloomgateway/bloomgateway.go
  2. 19
      pkg/bloomgateway/metrics.go
  3. 2
      pkg/bloomgateway/processor.go

@ -290,6 +290,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
var preFilterSeries, preFilterChunks int
preFilterSeries = len(req.Refs)
for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}
// Ideally we could use an unbuffered channel here, but since we return the
// request on the first error, there can be cases where the request context
// is not done yet and the consumeTask() function wants to send to the
@ -316,13 +323,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
remaining := len(tasks)
preFilterSeries := len(req.Refs)
var preFilterChunks, postFilterChunks int
for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}
combinedRecorder := v1.NewBloomRecorder(ctx, "combined")
for remaining > 0 {
select {
@ -353,11 +353,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
responsesPool.Put(resp)
}
postFilterSeries := len(filtered)
var postFilterSeries, postFilterChunks int
postFilterSeries = len(filtered)
for _, group := range filtered {
postFilterChunks += len(group.Refs)
}
g.metrics.requestedSeries.Observe(float64(preFilterSeries))
g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries))
g.metrics.requestedChunks.Observe(float64(preFilterChunks))

@ -116,12 +116,13 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
}
type workerMetrics struct {
dequeueDuration *prometheus.HistogramVec
queueDuration *prometheus.HistogramVec
processDuration *prometheus.HistogramVec
tasksDequeued *prometheus.CounterVec
tasksProcessed *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
dequeueDuration *prometheus.HistogramVec
queueDuration *prometheus.HistogramVec
processDuration *prometheus.HistogramVec
tasksDequeued *prometheus.CounterVec
tasksProcessed *prometheus.CounterVec
blocksNotAvailable *prometheus.CounterVec
blockQueryLatency *prometheus.HistogramVec
}
func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
@ -158,6 +159,12 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "tasks_processed_total",
Help: "Total amount of tasks that the worker processed",
}, append(labels, "status")),
blocksNotAvailable: r.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "blocks_not_available_total",
Help: "Total amount of blocks that have been skipped because they were not found or not downloaded yet",
}, labels),
blockQueryLatency: r.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,

@ -126,7 +126,7 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close
return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error {
bq := bqs[i]
if bq == nil {
// TODO(chaudum): Add metric for skipped blocks
p.metrics.blocksNotAvailable.WithLabelValues(p.id).Inc()
return nil
}

Loading…
Cancel
Save