|
|
|
@ -21,16 +21,13 @@ import ( |
|
|
|
|
"go.uber.org/atomic" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/v3/pkg/kafka" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/util/constants" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
|
|
|
|
|
// You can think about this overhead as an extra time for requests sitting in the client's buffer
|
|
|
|
|
// before being sent on the wire and the actual time it takes to send it over the network and
|
|
|
|
|
// start being processed by Kafka.
|
|
|
|
|
writerRequestTimeoutOverhead = 2 * time.Second |
|
|
|
|
) |
|
|
|
|
// writerRequestTimeoutOverhead is the overhead applied by the Writer to every Kafka timeout.
|
|
|
|
|
// You can think about this overhead as an extra time for requests sitting in the client's buffer
|
|
|
|
|
// before being sent on the wire and the actual time it takes to send it over the network and
|
|
|
|
|
// start being processed by Kafka.
|
|
|
|
|
var writerRequestTimeoutOverhead = 2 * time.Second |
|
|
|
|
|
|
|
|
|
// NewWriterClient returns the kgo.Client that should be used by the Writer.
|
|
|
|
|
//
|
|
|
|
@ -215,7 +212,7 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi |
|
|
|
|
// Metrics.
|
|
|
|
|
bufferedProduceBytes: promauto.With(reg).NewSummary( |
|
|
|
|
prometheus.SummaryOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Namespace: "kafka", |
|
|
|
|
Name: "buffered_produce_bytes", |
|
|
|
|
Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", |
|
|
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, |
|
|
|
@ -224,17 +221,17 @@ func NewProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Regi |
|
|
|
|
}), |
|
|
|
|
bufferedProduceBytesLimit: promauto.With(reg).NewGauge( |
|
|
|
|
prometheus.GaugeOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Namespace: "kafka", |
|
|
|
|
Name: "buffered_produce_bytes_limit", |
|
|
|
|
Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", |
|
|
|
|
}), |
|
|
|
|
produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Namespace: "kafka", |
|
|
|
|
Name: "produce_requests_total", |
|
|
|
|
Help: "Total number of produce requests issued to Kafka.", |
|
|
|
|
}), |
|
|
|
|
produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Namespace: "kafka", |
|
|
|
|
Name: "produce_failures_total", |
|
|
|
|
Help: "Total number of failed produce requests issued to Kafka.", |
|
|
|
|
}, []string{"reason"}), |
|
|
|
|