|
|
|
@ -200,6 +200,7 @@ type Distributor struct { |
|
|
|
|
// Will succeed usage tracker in future.
|
|
|
|
|
limitsFrontendRing ring.ReadRing |
|
|
|
|
limitsFrontends *ring_client.Pool |
|
|
|
|
limitsFailures prometheus.Counter |
|
|
|
|
|
|
|
|
|
// kafka
|
|
|
|
|
kafkaWriter KafkaProducer |
|
|
|
@ -335,6 +336,10 @@ func New( |
|
|
|
|
Name: "distributor_push_structured_metadata_sanitized_total", |
|
|
|
|
Help: "The total number of times we've had to sanitize structured metadata (names or values) at ingestion time per tenant.", |
|
|
|
|
}, []string{"tenant"}), |
|
|
|
|
limitsFailures: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ |
|
|
|
|
Name: "loki_distributor_ingest_limits_failures_total", |
|
|
|
|
Help: "The total number of failures checking per-tenant ingest limits.", |
|
|
|
|
}), |
|
|
|
|
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Name: "distributor_kafka_appends_total", |
|
|
|
@ -723,6 +728,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe |
|
|
|
|
if d.cfg.IngestLimitsEnabled { |
|
|
|
|
exceedsLimits, reasons, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC) |
|
|
|
|
if err != nil { |
|
|
|
|
d.limitsFailures.Inc() |
|
|
|
|
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err) |
|
|
|
|
} else if exceedsLimits { |
|
|
|
|
if d.cfg.IngestLimitsDryRunEnabled { |
|
|
|
|