|
|
|
|
@ -22,6 +22,7 @@ import ( |
|
|
|
|
type consumerMetrics struct { |
|
|
|
|
consumeLatency prometheus.Histogram |
|
|
|
|
currentOffset prometheus.Gauge |
|
|
|
|
pushLatency prometheus.Histogram |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// newConsumerMetrics initializes and returns a new consumerMetrics instance
|
|
|
|
|
@ -36,6 +37,11 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { |
|
|
|
|
Name: "loki_ingester_partition_current_offset", |
|
|
|
|
Help: "The current offset of the Kafka ingester consumer.", |
|
|
|
|
}), |
|
|
|
|
pushLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ |
|
|
|
|
Name: "loki_ingester_partition_push_latency_seconds", |
|
|
|
|
Help: "The latency of a push request after consuming from Kafka", |
|
|
|
|
NativeHistogramBucketFactor: 1.1, |
|
|
|
|
}), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -114,7 +120,12 @@ func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record |
|
|
|
|
Streams: []logproto.Stream{stream}, |
|
|
|
|
} |
|
|
|
|
if err := retryWithBackoff(ctx, func(attempts int) error { |
|
|
|
|
if _, err := kc.pusher.Push(recordCtx, req); err != nil { |
|
|
|
|
pushTime := time.Now() |
|
|
|
|
_, err := kc.pusher.Push(recordCtx, req) |
|
|
|
|
|
|
|
|
|
kc.metrics.pushLatency.Observe(time.Since(pushTime).Seconds()) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|