chore: move readerMetrics to separate file metrics.go (#14681)

pull/14686/head
George Robinson 7 months ago committed by GitHub
parent b89829421e
commit ebbbccfdaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 71
      pkg/kafka/partition/metrics.go
  2. 72
      pkg/kafka/partition/reader.go

@ -0,0 +1,71 @@
package partition
import (
"math"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/plugin/kprom"
"github.com/grafana/loki/v3/pkg/kafka/client"
)
type readerMetrics struct {
phase *prometheus.GaugeVec
receiveDelay *prometheus.HistogramVec
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
}
// newReaderMetrics initializes and returns a new set of metrics for the PartitionReader.
func newReaderMetrics(r prometheus.Registerer) readerMetrics {
return readerMetrics{
phase: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_phase",
Help: "The current phase of the consumer.",
}, []string{"phase"}),
receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_receive_delay_seconds",
Help: "Delay between producing a record and receiving it in the consumer.",
NativeHistogramZeroThreshold: math.Pow(2, -10), // Values below this will be considered to be 0. Equals to 0.0009765625, or about 1ms.
NativeHistogramBucketFactor: 1.2, // We use higher factor (scheme=2) to have wider spread of buckets.
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), // Buckets between 125ms and 9h.
}, []string{"phase"}),
kprom: client.NewReaderClientMetrics("partition-reader", r),
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds",
Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.",
NativeHistogramBucketFactor: 1.1,
}),
recordsPerFetch: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_records_per_fetch",
Help: "The number of records received by the consumer in a single fetch operation.",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
}),
fetchesErrors: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_fetch_errors_total",
Help: "The number of fetch errors encountered by the consumer.",
}),
fetchesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_fetches_total",
Help: "Total number of Kafka fetches received by the consumer.",
}),
}
}
func (m *readerMetrics) reportStarting() {
m.phase.WithLabelValues(phaseStarting).Set(1)
m.phase.WithLabelValues(phaseRunning).Set(0)
}
func (m *readerMetrics) reportRunning() {
m.phase.WithLabelValues(phaseStarting).Set(0)
m.phase.WithLabelValues(phaseRunning).Set(1)
}

@ -3,7 +3,6 @@ package partition
import (
"context"
"fmt"
"math"
"time"
"github.com/coder/quartz"
@ -14,12 +13,10 @@ import (
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/plugin/kprom"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
@ -30,6 +27,9 @@ var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadli
const (
kafkaStartOffset = -2
kafkaEndOffset = -1
phaseStarting = "starting"
phaseRunning = "running"
)
// Reader is responsible for reading data from a specific Kafka partition
@ -99,9 +99,7 @@ func (p *Reader) start(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
p.metrics.phase.WithLabelValues("starting").Set(1)
p.metrics.phase.WithLabelValues("running").Set(0)
p.metrics.reportStarting()
// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
@ -144,8 +142,7 @@ func (p *Reader) start(ctx context.Context) error {
// data from Kafka, and send it to the consumer.
func (p *Reader) run(ctx context.Context) error {
level.Info(p.logger).Log("msg", "starting partition reader", "partition", p.partitionID, "consumer_group", p.consumerGroup)
p.metrics.phase.WithLabelValues("starting").Set(0)
p.metrics.phase.WithLabelValues("running").Set(1)
p.metrics.reportRunning()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -484,9 +481,9 @@ func (p *Reader) recordFetchesMetrics(fetches kgo.Fetches) {
numRecords++
delay := now.Sub(record.Timestamp).Seconds()
if p.Service.State() == services.Starting {
p.metrics.receiveDelayWhenStarting.Observe(delay)
p.metrics.receiveDelay.WithLabelValues(phaseStarting).Observe(delay)
} else {
p.metrics.receiveDelayWhenRunning.Observe(delay)
p.metrics.receiveDelay.WithLabelValues(phaseRunning).Observe(delay)
}
})
@ -517,58 +514,3 @@ func isErrFetch(fetch kgo.Fetch) bool {
}
return false
}
type readerMetrics struct {
phase *prometheus.GaugeVec
receiveDelayWhenStarting prometheus.Observer
receiveDelayWhenRunning prometheus.Observer
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
// strongConsistencyInstrumentation *StrongReadConsistencyInstrumentation[struct{}]
// lastConsumedOffset prometheus.Gauge
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
}
// newReaderMetrics initializes and returns a new set of metrics for the PartitionReader.
func newReaderMetrics(reg prometheus.Registerer) readerMetrics {
receiveDelay := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_receive_delay_seconds",
Help: "Delay between producing a record and receiving it in the consumer.",
NativeHistogramZeroThreshold: math.Pow(2, -10), // Values below this will be considered to be 0. Equals to 0.0009765625, or about 1ms.
NativeHistogramBucketFactor: 1.2, // We use higher factor (scheme=2) to have wider spread of buckets.
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), // Buckets between 125ms and 9h.
}, []string{"phase"})
return readerMetrics{
phase: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_phase",
Help: "The current phase of the consumer.",
}, []string{"phase"}),
receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"),
receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"),
kprom: client.NewReaderClientMetrics("partition-reader", reg),
fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_records_batch_wait_duration_seconds",
Help: "How long a consumer spent waiting for a batch of records from the Kafka client. If fetching is faster than processing, then this will be close to 0.",
NativeHistogramBucketFactor: 1.1,
}),
recordsPerFetch: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_records_per_fetch",
Help: "The number of records received by the consumer in a single fetch operation.",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
}),
fetchesErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_fetch_errors_total",
Help: "The number of fetch errors encountered by the consumer.",
}),
fetchesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_fetches_total",
Help: "Total number of Kafka fetches received by the consumer.",
}),
}
}

Loading…
Cancel
Save