promtail: Add `max-line-size-truncate` (#8233)

pull/8232/head
nicoche 2 years ago committed by GitHub
parent fd72aa8908
commit 80ea621ff3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      clients/cmd/docker-driver/loki.go
  3. 2
      clients/cmd/fluent-bit/client.go
  4. 2
      clients/cmd/fluent-bit/dque.go
  5. 99
      clients/pkg/promtail/client/client.go
  6. 229
      clients/pkg/promtail/client/client_test.go
  7. 2
      clients/pkg/promtail/client/logger.go
  8. 4
      clients/pkg/promtail/client/multi.go
  9. 14
      clients/pkg/promtail/client/multi_test.go
  10. 2
      clients/pkg/promtail/limit/config.go
  11. 2
      clients/pkg/promtail/promtail.go
  12. 5
      clients/pkg/promtail/targets/file/decompresser_test.go
  13. 2
      clients/pkg/promtail/targets/lokipush/pushtarget_test.go
  14. 4
      docs/sources/clients/promtail/configuration.md

@ -51,6 +51,7 @@
* [7973](https://github.com/grafana/loki/pull/7973) **chodges15**: Add configuration to drop rate limited batches in Loki client and new metric label for drop reason.
* [8153](https://github.com/grafana/loki/pull/8153) **kavirajk**: promtail: Add `max-line-size` limit to drop on client side
* [8096](https://github.com/grafana/loki/pull/8096) **kavirajk**: doc(promtail): Doc about how log rotate works with promtail
* [8233](https://github.com/grafana/loki/pull/8233) **nicoche**: promtail: Add `max-line-size-truncate` limit to truncate too long lines on client side
##### Enhancements

@ -39,7 +39,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
return nil, err
}
m := client.NewMetrics(prometheus.DefaultRegisterer)
c, err := client.New(m, cfg.clientConfig, 0, 0, logger)
c, err := client.New(m, cfg.clientConfig, 0, 0, false, logger)
if err != nil {
return nil, err
}

@ -11,5 +11,5 @@ func NewClient(cfg *config, logger log.Logger, metrics *client.Metrics) (client.
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger, metrics)
}
return client.New(metrics, cfg.clientConfig, 0, 0, logger)
return client.New(metrics, cfg.clientConfig, 0, 0, false, logger)
}

@ -72,7 +72,7 @@ func newDque(cfg *config, logger log.Logger, metrics *client.Metrics) (client.Cl
_ = q.queue.TurboOn()
}
q.loki, err = client.New(metrics, cfg.clientConfig, 0, 0, logger)
q.loki, err = client.New(metrics, cfg.clientConfig, 0, 0, false, logger)
if err != nil {
return nil, err
}

@ -34,19 +34,19 @@ const (
// pipeline stages
ReservedLabelTenantID = "__tenant_id__"
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
DropReasonLabel = "reason"
DropReasonGeneric = "ingester_error"
DropReasonRateLimited = "rate_limited"
DropReasonStreamLimited = "stream_limited"
DropReasongMaxLineSizeLimited = "max_line_size_limited"
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
ReasonLabel = "reason"
ReasonGeneric = "ingester_error"
ReasonRateLimited = "rate_limited"
ReasonStreamLimited = "stream_limited"
ReasonLineTooLong = "line_too_long"
)
var DropReasons = []string{DropReasonGeneric, DropReasonRateLimited, DropReasonStreamLimited}
var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong}
var UserAgent = fmt.Sprintf("promtail/%s", build.Version)
@ -56,6 +56,8 @@ type Metrics struct {
droppedBytes *prometheus.CounterVec
sentEntries *prometheus.CounterVec
droppedEntries *prometheus.CounterVec
mutatedEntries *prometheus.CounterVec
mutatedBytes *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
countersWithHost []*prometheus.CounterVec
@ -80,7 +82,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, DropReasonLabel})
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
@ -90,7 +92,17 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, DropReasonLabel})
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "mutated_entries_total",
Help: "The total number of log entries that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.mutatedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "mutated_bytes_total",
Help: "The total number of bytes that have been mutated.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
@ -111,7 +123,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
}
m.countersWithHostTenantReason = []*prometheus.CounterVec{
m.droppedBytes, m.droppedEntries,
m.droppedBytes, m.droppedEntries, m.mutatedEntries, m.mutatedBytes,
}
if reg != nil {
@ -120,6 +132,8 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
}
@ -160,24 +174,25 @@ type client struct {
externalLabels model.LabelSet
// ctx is used in any upstream calls from the `client`.
ctx context.Context
cancel context.CancelFunc
maxStreams int
maxLineSize int
ctx context.Context
cancel context.CancelFunc
maxStreams int
maxLineSize int
maxLineSizeTruncate bool
}
// Tripperware can wrap a roundtripper.
type Tripperware func(http.RoundTripper) http.RoundTripper
// New makes a new Client.
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger) (Client, error) {
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (Client, error) {
if cfg.StreamLagLabels.String() != "" {
return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String())
}
return newClient(metrics, cfg, maxStreams, maxLineSize, logger)
return newClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
}
func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger) (*client, error) {
func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*client, error) {
if cfg.URL.URL == nil {
return nil, errors.New("client needs target URL")
@ -192,11 +207,12 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger
metrics: metrics,
name: asSha256(cfg),
externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
maxStreams: maxStreams,
maxLineSize: maxLineSize,
externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
cancel: cancel,
maxStreams: maxStreams,
maxLineSize: maxLineSize,
maxLineSizeTruncate: maxLineSizeTruncate,
}
if cfg.Name != "" {
c.name = cfg.Name
@ -226,8 +242,8 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger
}
// NewWithTripperware creates a new Loki client with a custom tripperware.
func NewWithTripperware(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, logger log.Logger, tp Tripperware) (Client, error) {
c, err := newClient(metrics, cfg, maxStreams, maxLineSize, logger)
func NewWithTripperware(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, tp Tripperware) (Client, error) {
c, err := newClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
if err != nil {
return nil, err
}
@ -243,7 +259,7 @@ func (c *client) initBatchMetrics(tenantID string) {
// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range c.metrics.countersWithHostTenantReason {
for _, reason := range DropReasons {
for _, reason := range Reasons {
counter.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(0)
}
}
@ -289,10 +305,17 @@ func (c *client) run() {
e, tenantID := c.processEntry(e)
// drop the entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
// Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
if c.maxLineSize != 0 && len(e.Line) > c.maxLineSize {
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasongMaxLineSizeLimited).Inc()
break
if !c.maxLineSizeTruncate {
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line)))
break
}
c.metrics.mutatedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
c.metrics.mutatedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line) - c.maxLineSize))
e.Line = e.Line[:c.maxLineSize]
}
batch, ok := batches[tenantID]
@ -317,9 +340,9 @@ func (c *client) run() {
err := batch.add(e)
if err != nil {
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
reason := DropReasonGeneric
reason := ReasonGeneric
if err.Error() == errMaxStreamsLimitExceeded {
reason = DropReasonStreamLimited
reason = ReasonStreamLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line)))
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Inc()
@ -376,8 +399,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
// Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling
if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) {
level.Warn(c.logger).Log("msg", "dropping batch due to rate limiting applied at ingester")
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasonRateLimited).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, DropReasonRateLimited).Add(float64(entriesCount))
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(float64(entriesCount))
return
}
@ -407,9 +430,9 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "tenant", tenantID, "error", err)
// If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors
// were for a different reason
dropReason := DropReasonGeneric
dropReason := ReasonGeneric
if batchIsRateLimited(status) {
dropReason = DropReasonRateLimited
dropReason = ReasonRateLimited
}
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes)
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount))

@ -45,17 +45,18 @@ type receivedReq struct {
func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
clientBatchWait time.Duration
clientMaxRetries int
clientMaxLineSize int
clientTenantID string
clientDropRateLimited bool
serverResponseStatus int
inputEntries []api.Entry
inputDelay time.Duration
expectedReqs []receivedReq
expectedMetrics string
clientBatchSize int
clientBatchWait time.Duration
clientMaxRetries int
clientMaxLineSize int
clientMaxLineSizeTruncate bool
clientTenantID string
clientDropRateLimited bool
serverResponseStatus int
inputEntries []api.Entry
inputDelay time.Duration
expectedReqs []receivedReq
expectedMetrics string
}{
"batch log entries together until the batch size is reached": {
clientBatchSize: 10,
@ -80,17 +81,31 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
`,
},
"log entries have max_line_size exceeded": {
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
clientMaxLineSize: 10, // any log line more than this length should be discarded
serverResponseStatus: 200,
inputEntries: []api.Entry{logEntries[0], logEntries[1], logEntries[6]}, // this logEntries[6] entries has line more than size 10
"dropping log entries that have max_line_size exceeded": {
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
clientMaxLineSize: 10, // any log line more than this length should be discarded
clientMaxLineSizeTruncate: false,
serverResponseStatus: 200,
inputEntries: []api.Entry{logEntries[0], logEntries[1], logEntries[6]}, // this logEntries[6] entries has line more than size 10
expectedReqs: []receivedReq{
{
tenantID: "",
@ -104,9 +119,66 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="max_line_size_limited",tenant=""} 1
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
`,
},
"truncating log entries that have max_line_size exceeded": {
clientBatchSize: 10,
clientBatchWait: 100 * time.Millisecond,
clientMaxRetries: 3,
clientMaxLineSize: 10,
clientMaxLineSizeTruncate: true,
serverResponseStatus: 200,
inputEntries: []api.Entry{logEntries[0], logEntries[1], logEntries[6]}, // logEntries[6]'s line is greater than 10 bytes
expectedReqs: []receivedReq{
{
tenantID: "",
pushReq: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{
logEntries[0].Entry,
logEntries[1].Entry,
{
Timestamp: logEntries[6].Entry.Timestamp,
Line: logEntries[6].Line[:10],
},
}}}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 1
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 4
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
`,
},
@ -134,8 +206,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
@ -162,8 +247,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
@ -185,8 +283,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
@ -216,8 +327,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
@ -240,8 +364,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 0
@ -267,8 +404,21 @@ func TestClient_Handle(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__", reason="ingester_error", tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__", reason="rate_limited", tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
`,
},
"batch log entries together honoring the tenant ID overridden while processing the pipeline stages": {
@ -301,12 +451,43 @@ func TestClient_Handle(t *testing.T) {
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
# HELP promtail_mutated_entries_total The total number of log entries that have been mutated.
# TYPE promtail_mutated_entries_total counter
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0
promtail_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0
promtail_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0
promtail_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0
promtail_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
# HELP promtail_mutated_bytes_total The total number of bytes that have been mutated.
# TYPE promtail_mutated_bytes_total counter
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-2"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-1"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-2"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-1"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-2"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant="tenant-default"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-1"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-2"} 0
promtail_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant="tenant-default"} 0
`,
},
}
@ -342,7 +523,7 @@ func TestClient_Handle(t *testing.T) {
}
m := NewMetrics(reg)
c, err := New(m, cfg, 0, testData.clientMaxLineSize, log.NewNopLogger())
c, err := New(m, cfg, 0, testData.clientMaxLineSize, testData.clientMaxLineSizeTruncate, log.NewNopLogger())
require.NoError(t, err)
// Send all the input log entries
@ -378,7 +559,7 @@ func TestClient_Handle(t *testing.T) {
fmt.Printf("Expected reqs: %#v\n", testData.expectedReqs)
expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
err = testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total", "promtail_mutated_entries_total", "promtail_mutated_bytes_total")
assert.NoError(t, err)
})
}
@ -421,6 +602,7 @@ func TestClient_StopNow(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
`,
@ -442,6 +624,7 @@ func TestClient_StopNow(t *testing.T) {
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
promtail_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 1
promtail_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
@ -481,7 +664,7 @@ func TestClient_StopNow(t *testing.T) {
}
m := NewMetrics(reg)
cl, err := New(m, cfg, 0, 0, log.NewNopLogger())
cl, err := New(m, cfg, 0, 0, false, log.NewNopLogger())
require.NoError(t, err)
// Send all the input log entries
@ -557,7 +740,7 @@ func Test_Tripperware(t *testing.T) {
var called bool
c, err := NewWithTripperware(metrics, Config{
URL: flagext.URLValue{URL: url},
}, 0, 0, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper {
}, 0, 0, false, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
require.Equal(t, r.URL.String(), "http://foo.com")
called = true

@ -37,7 +37,7 @@ type logger struct {
// NewLogger creates a new client logger that logs entries instead of sending them.
func NewLogger(metrics *Metrics, log log.Logger, cfgs ...Config) (Client, error) {
// make sure the clients config is valid
c, err := NewMulti(metrics, log, 0, 0, cfgs...)
c, err := NewMulti(metrics, log, 0, 0, false, cfgs...)
if err != nil {
return nil, err
}

@ -21,7 +21,7 @@ type MultiClient struct {
}
// NewMulti creates a new client
func NewMulti(metrics *Metrics, logger log.Logger, maxStreams, maxLineSize int, cfgs ...Config) (Client, error) {
func NewMulti(metrics *Metrics, logger log.Logger, maxStreams, maxLineSize int, maxLineSizeTruncate bool, cfgs ...Config) (Client, error) {
var fake struct{}
if len(cfgs) == 0 {
@ -30,7 +30,7 @@ func NewMulti(metrics *Metrics, logger log.Logger, maxStreams, maxLineSize int,
clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(cfgs))
for _, cfg := range cfgs {
client, err := New(metrics, cfg, maxStreams, maxLineSize, logger)
client, err := New(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
if err != nil {
return nil, err
}

@ -27,7 +27,7 @@ var (
)
func TestNewMulti(t *testing.T) {
_, err := NewMulti(nilMetrics, util_log.Logger, 0, 0, []Config{}...)
_, err := NewMulti(nilMetrics, util_log.Logger, 0, 0, false, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
@ -46,7 +46,7 @@ func TestNewMulti(t *testing.T) {
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}},
}
clients, err := NewMulti(metrics, util_log.Logger, 0, 0, cc1, cc2)
clients, err := NewMulti(metrics, util_log.Logger, 0, 0, false, cc1, cc2)
if err != nil {
t.Fatalf("expected err: nil got:%v", err)
}
@ -69,7 +69,7 @@ func TestNewMulti(t *testing.T) {
}
func TestNewMulti_BlockDuplicates(t *testing.T) {
_, err := NewMulti(nilMetrics, util_log.Logger, 0, 0, []Config{}...)
_, err := NewMulti(nilMetrics, util_log.Logger, 0, 0, false, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
@ -82,11 +82,11 @@ func TestNewMulti_BlockDuplicates(t *testing.T) {
}
cc1Copy := cc1
_, err = NewMulti(metrics, util_log.Logger, 0, 0, cc1, cc1Copy)
_, err = NewMulti(metrics, util_log.Logger, 0, 0, false, cc1, cc1Copy)
require.Error(t, err, "expected NewMulti to reject duplicate client configs")
cc1Copy.Name = "copy"
clients, err := NewMulti(metrics, util_log.Logger, 0, 0, cc1, cc1Copy)
clients, err := NewMulti(metrics, util_log.Logger, 0, 0, false, cc1, cc1Copy)
require.NoError(t, err, "expected NewMulti to reject duplicate client configs")
multi := clients.(*MultiClient)
@ -148,9 +148,9 @@ func TestMultiClient_Handle(t *testing.T) {
func TestMultiClient_Handle_Race(t *testing.T) {
u := flagext.URLValue{}
require.NoError(t, u.Set("http://localhost"))
c1, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, 0, 0, log.NewNopLogger())
c1, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, 0, 0, false, log.NewNopLogger())
require.NoError(t, err)
c2, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, 0, 0, log.NewNopLogger())
c2, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, 0, 0, false, log.NewNopLogger())
require.NoError(t, err)
clients := []Client{c1, c2}
m := &MultiClient{

@ -13,6 +13,7 @@ type Config struct {
ReadlineRateDrop bool `mapstructure:"readline_rate_drop,omitempty" yaml:"readline_rate_drop,omitempty" json:"readline_rate_drop"`
MaxStreams int `mapstructure:"max_streams" yaml:"max_streams" json:"max_streams"`
MaxLineSize flagext.ByteSize `mapstructure:"max_line_size" yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `mapstructure:"max_line_size_truncate" yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@ -22,4 +23,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.ReadlineRateDrop, prefix+"limit.readline-rate-drop", true, "When true, exceeding the rate limit causes this instance of Promtail to discard log lines, rather than sending them to Loki.")
f.IntVar(&cfg.MaxStreams, prefix+"max-streams", 0, "Maximum number of active streams. 0 to disable.")
f.Var(&cfg.MaxLineSize, prefix+"max-line-size", "Maximum log line byte size allowed without dropping. Example: 256kb, 2M. 0 to disable.")
f.BoolVar(&cfg.MaxLineSizeTruncate, prefix+"max-line-size-truncate", false, "Whether to truncate lines that exceed max_line_size. No effect if max_line_size is disabled")
}

@ -140,7 +140,7 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
}
cfg.PositionsConfig.ReadOnly = true
} else {
p.client, err = client.NewMulti(p.metrics, p.logger, cfg.LimitsConfig.MaxStreams, cfg.LimitsConfig.MaxLineSize.Val(), cfg.ClientConfigs...)
p.client, err = client.NewMulti(p.metrics, p.logger, cfg.LimitsConfig.MaxStreams, cfg.LimitsConfig.MaxLineSize.Val(), cfg.LimitsConfig.MaxLineSizeTruncate, cfg.ClientConfigs...)
if err != nil {
return err
}

@ -7,11 +7,12 @@ import (
"time"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
)
type noopClient struct {

@ -85,7 +85,7 @@ func TestLokiPushTarget(t *testing.T) {
BatchSize: 100 * 1024,
}
m := client.NewMetrics(prometheus.DefaultRegisterer)
pc, err := client.New(m, ccfg, 0, 0, logger)
pc, err := client.New(m, ccfg, 0, 0, false, logger)
require.NoError(t, err)
defer pc.Stop()

@ -1942,8 +1942,10 @@ The optional `limits_config` block configures global limits for this instance of
# 0 means it is disabled.
[max_streams: <int> | default = 0]
Maximum log line byte size allowed without dropping. Example: 256kb, 2M. 0 to disable.
# Maximum log line byte size allowed without dropping. Example: 256kb, 2M. 0 to disable.
[max_line_size: <int> | default = 0]
# Whether to truncate lines that exceed max_line_size. No effect if max_line_size is disabled
[max_line_size_truncate: <bool> | default = false]
```
## target_config

Loading…
Cancel
Save