From 406690d26e7397fdb59dfebcc76fd9415eb2456e Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Fri, 4 Apr 2025 18:07:38 +0200 Subject: [PATCH] fix(ingest-limits): Incorrect ingested bytes counter usage (#17037) --- pkg/limits/ingest_limits.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/limits/ingest_limits.go b/pkg/limits/ingest_limits.go index 177b4c67d3..5b5f7ed072 100644 --- a/pkg/limits/ingest_limits.go +++ b/pkg/limits/ingest_limits.go @@ -52,13 +52,6 @@ var ( []string{"tenant"}, nil, ) - - tenantIngestedBytesTotal = prometheus.NewDesc( - constants.Loki+"_ingest_limits_ingested_bytes_total", - "The total number of bytes ingested per tenant within the active window. This is not a global total, as tenants can be sharded over multiple pods.", - []string{"tenant"}, - nil, - ) ) type metrics struct { @@ -66,6 +59,8 @@ type metrics struct { kafkaConsumptionLag prometheus.Histogram kafkaReadBytesTotal prometheus.Counter + + tenantIngestedBytesTotal *prometheus.CounterVec } func newMetrics(reg prometheus.Registerer) *metrics { @@ -88,6 +83,11 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "ingest_limits_kafka_read_bytes_total", Help: "Total number of bytes read from Kafka.", }), + tenantIngestedBytesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "ingest_limits_tenant_ingested_bytes_total", + Help: "Total number of bytes ingested per tenant within the active window. This is not a global total, as tenants can be sharded over multiple pods.", + }, []string{"tenant"}), } } @@ -191,7 +191,6 @@ func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc) { descs <- tenantPartitionDesc descs <- tenantRecordedStreamsDesc descs <- tenantActiveStreamsDesc - descs <- tenantIngestedBytesTotal } func (s *IngestLimits) Collect(m chan<- prometheus.Metric) { @@ -202,9 +201,8 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) { for tenant, partitions := range s.metadata { var ( - recorded int - active int - totalBytes uint64 + recorded int + active int ) for partitionID, partition := range partitions { @@ -217,7 +215,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) { for _, stream := range partition { if stream.lastSeenAt >= cutoff { active++ - totalBytes += stream.totalSize } } } @@ -225,7 +222,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) { m <- prometheus.MustNewConstMetric(tenantPartitionDesc, prometheus.GaugeValue, float64(len(partitions)), tenant) m <- prometheus.MustNewConstMetric(tenantRecordedStreamsDesc, prometheus.GaugeValue, float64(recorded), tenant) m <- prometheus.MustNewConstMetric(tenantActiveStreamsDesc, prometheus.GaugeValue, float64(active), tenant) - m <- prometheus.MustNewConstMetric(tenantIngestedBytesTotal, prometheus.CounterValue, float64(totalBytes), tenant) } } @@ -439,6 +435,8 @@ func (s *IngestLimits) updateMetadata(rec *logproto.StreamMetadata, tenant strin recordTime := lastSeenAt.UnixNano() recTotalSize := rec.EntriesSize + rec.StructuredMetadataSize + s.metrics.tenantIngestedBytesTotal.WithLabelValues(tenant).Add(float64(recTotalSize)) + // Get the bucket for this timestamp using the configured interval duration bucketStart := lastSeenAt.Truncate(s.cfg.BucketDuration).UnixNano()