fix(ingest-limits): Incorrect ingested bytes counter usage (#17037)

pull/17040/head
Periklis Tsirakidis 2 months ago committed by GitHub
parent d204d7465c
commit 406690d26e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 24
      pkg/limits/ingest_limits.go

@ -52,13 +52,6 @@ var (
[]string{"tenant"}, []string{"tenant"},
nil, nil,
) )
tenantIngestedBytesTotal = prometheus.NewDesc(
constants.Loki+"_ingest_limits_ingested_bytes_total",
"The total number of bytes ingested per tenant within the active window. This is not a global total, as tenants can be sharded over multiple pods.",
[]string{"tenant"},
nil,
)
) )
type metrics struct { type metrics struct {
@ -66,6 +59,8 @@ type metrics struct {
kafkaConsumptionLag prometheus.Histogram kafkaConsumptionLag prometheus.Histogram
kafkaReadBytesTotal prometheus.Counter kafkaReadBytesTotal prometheus.Counter
tenantIngestedBytesTotal *prometheus.CounterVec
} }
func newMetrics(reg prometheus.Registerer) *metrics { func newMetrics(reg prometheus.Registerer) *metrics {
@ -88,6 +83,11 @@ func newMetrics(reg prometheus.Registerer) *metrics {
Name: "ingest_limits_kafka_read_bytes_total", Name: "ingest_limits_kafka_read_bytes_total",
Help: "Total number of bytes read from Kafka.", Help: "Total number of bytes read from Kafka.",
}), }),
tenantIngestedBytesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingest_limits_tenant_ingested_bytes_total",
Help: "Total number of bytes ingested per tenant within the active window. This is not a global total, as tenants can be sharded over multiple pods.",
}, []string{"tenant"}),
} }
} }
@ -191,7 +191,6 @@ func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc) {
descs <- tenantPartitionDesc descs <- tenantPartitionDesc
descs <- tenantRecordedStreamsDesc descs <- tenantRecordedStreamsDesc
descs <- tenantActiveStreamsDesc descs <- tenantActiveStreamsDesc
descs <- tenantIngestedBytesTotal
} }
func (s *IngestLimits) Collect(m chan<- prometheus.Metric) { func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
@ -202,9 +201,8 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
for tenant, partitions := range s.metadata { for tenant, partitions := range s.metadata {
var ( var (
recorded int recorded int
active int active int
totalBytes uint64
) )
for partitionID, partition := range partitions { for partitionID, partition := range partitions {
@ -217,7 +215,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
for _, stream := range partition { for _, stream := range partition {
if stream.lastSeenAt >= cutoff { if stream.lastSeenAt >= cutoff {
active++ active++
totalBytes += stream.totalSize
} }
} }
} }
@ -225,7 +222,6 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
m <- prometheus.MustNewConstMetric(tenantPartitionDesc, prometheus.GaugeValue, float64(len(partitions)), tenant) m <- prometheus.MustNewConstMetric(tenantPartitionDesc, prometheus.GaugeValue, float64(len(partitions)), tenant)
m <- prometheus.MustNewConstMetric(tenantRecordedStreamsDesc, prometheus.GaugeValue, float64(recorded), tenant) m <- prometheus.MustNewConstMetric(tenantRecordedStreamsDesc, prometheus.GaugeValue, float64(recorded), tenant)
m <- prometheus.MustNewConstMetric(tenantActiveStreamsDesc, prometheus.GaugeValue, float64(active), tenant) m <- prometheus.MustNewConstMetric(tenantActiveStreamsDesc, prometheus.GaugeValue, float64(active), tenant)
m <- prometheus.MustNewConstMetric(tenantIngestedBytesTotal, prometheus.CounterValue, float64(totalBytes), tenant)
} }
} }
@ -439,6 +435,8 @@ func (s *IngestLimits) updateMetadata(rec *logproto.StreamMetadata, tenant strin
recordTime := lastSeenAt.UnixNano() recordTime := lastSeenAt.UnixNano()
recTotalSize := rec.EntriesSize + rec.StructuredMetadataSize recTotalSize := rec.EntriesSize + rec.StructuredMetadataSize
s.metrics.tenantIngestedBytesTotal.WithLabelValues(tenant).Add(float64(recTotalSize))
// Get the bucket for this timestamp using the configured interval duration // Get the bucket for this timestamp using the configured interval duration
bucketStart := lastSeenAt.Truncate(s.cfg.BucketDuration).UnixNano() bucketStart := lastSeenAt.Truncate(s.cfg.BucketDuration).UnixNano()

Loading…
Cancel
Save