diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index ee0e6f9940..165e2d6524 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -290,6 +290,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) + var preFilterSeries, preFilterChunks int + + preFilterSeries = len(req.Refs) + for _, series := range req.Refs { + preFilterChunks += len(series.Refs) + } + // Ideally we could use an unbuffered channel here, but since we return the // request on the first error, there can be cases where the request context // is not done yet and the consumeTask() function wants to send to the @@ -316,13 +323,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk remaining := len(tasks) - preFilterSeries := len(req.Refs) - var preFilterChunks, postFilterChunks int - - for _, series := range req.Refs { - preFilterChunks += len(series.Refs) - } - combinedRecorder := v1.NewBloomRecorder(ctx, "combined") for remaining > 0 { select { @@ -353,11 +353,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responsesPool.Put(resp) } - postFilterSeries := len(filtered) - + var postFilterSeries, postFilterChunks int + postFilterSeries = len(filtered) for _, group := range filtered { postFilterChunks += len(group.Refs) } + g.metrics.requestedSeries.Observe(float64(preFilterSeries)) g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries)) g.metrics.requestedChunks.Observe(float64(preFilterChunks)) diff --git a/pkg/bloomgateway/metrics.go b/pkg/bloomgateway/metrics.go index 0885bc2ae7..5c046d3147 100644 --- a/pkg/bloomgateway/metrics.go +++ b/pkg/bloomgateway/metrics.go @@ -116,12 +116,13 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str } type workerMetrics struct { - dequeueDuration *prometheus.HistogramVec - queueDuration *prometheus.HistogramVec - processDuration *prometheus.HistogramVec - tasksDequeued *prometheus.CounterVec - tasksProcessed *prometheus.CounterVec - blockQueryLatency *prometheus.HistogramVec + dequeueDuration *prometheus.HistogramVec + queueDuration *prometheus.HistogramVec + processDuration *prometheus.HistogramVec + tasksDequeued *prometheus.CounterVec + tasksProcessed *prometheus.CounterVec + blocksNotAvailable *prometheus.CounterVec + blockQueryLatency *prometheus.HistogramVec } func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { @@ -158,6 +159,12 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str Name: "tasks_processed_total", Help: "Total amount of tasks that the worker processed", }, append(labels, "status")), + blocksNotAvailable: r.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "blocks_not_available_total", + Help: "Total amount of blocks that have been skipped because they were not found or not downloaded yet", + }, labels), blockQueryLatency: r.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 1e8452ded5..6973ad1f56 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -126,7 +126,7 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close return concurrency.ForEachJob(ctx, len(bqs), p.concurrency, func(ctx context.Context, i int) error { bq := bqs[i] if bq == nil { - // TODO(chaudum): Add metric for skipped blocks + p.metrics.blocksNotAvailable.WithLabelValues(p.id).Inc() return nil }