From 0b1dbe26fe7643c7681560ecca9585b4cf976dd9 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 18 Sep 2020 12:15:39 -0400 Subject: [PATCH] Promtail: Add a stream lagging metric (#2618) * add a metric which displays how far a stream lags the last timestamp in a batch * register your metrics * recorded metric in the wrong spot, moved to the correct spot * cardinality is much too high if we add all the stream labels, instead just use filename. * work on using an interface to set/remove metrics * reworking interface again * for now just register this metric based on filename only, we can improve this in the future if the need arises * change the timeout to 1min for expiring the metric * add host label * include host label when removing metric * change the batch size for promtail from 100kb to 1Mb, this will not affect sending for lower volumes which are still driven by the BatchWait setting. Through the addition of this metric it was found that for higher volume files > 100kb/sec this was much too low causing too many batches to be unnecessarily sent. --- pkg/logentry/metric/metricvec.go | 11 +++++ pkg/promtail/api/types.go | 5 +++ pkg/promtail/client/client.go | 54 ++++++++++++++++++++++--- pkg/promtail/client/config.go | 2 +- pkg/promtail/targets/file/filetarget.go | 4 ++ 5 files changed, 69 insertions(+), 7 deletions(-) diff --git a/pkg/logentry/metric/metricvec.go b/pkg/logentry/metric/metricvec.go index c7a2feadb5..335982681d 100644 --- a/pkg/logentry/metric/metricvec.go +++ b/pkg/logentry/metric/metricvec.go @@ -58,6 +58,17 @@ func (c *metricVec) With(labels model.LabelSet) prometheus.Metric { return metric } +func (c *metricVec) Delete(labels model.LabelSet) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + fp := labels.Fingerprint() + _, ok := c.metrics[fp] + if ok { + delete(c.metrics, fp) + } + return ok +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/pkg/promtail/api/types.go b/pkg/promtail/api/types.go index 48fea8f01d..69efd0eb31 100644 --- a/pkg/promtail/api/types.go +++ b/pkg/promtail/api/types.go @@ -6,6 +6,11 @@ import ( "github.com/prometheus/common/model" ) +type InstrumentedEntryHandler interface { + EntryHandler + UnregisterLatencyMetric(labels model.LabelSet) +} + // EntryHandler is something that can "handle" entries. type EntryHandler interface { Handle(labels model.LabelSet, time time.Time, entry string) error diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index bb5014b58e..083a5f07af 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -12,6 +12,9 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql/parser" + + "github.com/grafana/loki/pkg/logentry/metric" "github.com/grafana/loki/pkg/promtail/api" "github.com/cortexproject/cortex/pkg/util" @@ -34,6 +37,9 @@ const ( // Label reserved to override the tenant ID while processing // pipeline stages ReservedLabelTenantID = "__tenant_id__" + + LatencyLabel = "filename" + HostLabel = "host" ) var ( @@ -41,32 +47,33 @@ var ( Namespace: "promtail", Name: "encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", - }, []string{"host"}) + }, []string{HostLabel}) sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_bytes_total", Help: "Number of bytes sent.", - }, []string{"host"}) + }, []string{HostLabel}) droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", - }, []string{"host"}) + }, []string{HostLabel}) sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_entries_total", Help: "Number of log entries sent to the ingester.", - }, []string{"host"}) + }, []string{HostLabel}) droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", - }, []string{"host"}) + }, []string{HostLabel}) requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", Help: "Duration of send requests.", - }, []string{"status_code", "host"}) + }, []string{"status_code", HostLabel}) + streamLag *metric.Gauges countersWithHost = []*prometheus.CounterVec{ encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries, @@ -82,6 +89,16 @@ func init() { prometheus.MustRegister(sentEntries) prometheus.MustRegister(droppedEntries) prometheus.MustRegister(requestDuration) + var err error + streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", + "Difference between current time and last batch timestamp for successful sends", + metric.GaugeConfig{Action: "set"}, + int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. + ) + if err != nil { + panic(err) + } + prometheus.MustRegister(streamLag) } // Client pushes entries to Loki and can be stopped @@ -234,6 +251,26 @@ func (c *client) sendBatch(tenantID string, batch *batch) { if err == nil { sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + for _, s := range batch.streams { + lbls, err := parser.ParseMetric(s.Labels) + if err != nil { + // is this possible? + level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) + return + } + var lblSet model.LabelSet + for i := range lbls { + if lbls[i].Name == LatencyLabel { + lblSet = model.LabelSet{ + model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), + model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), + } + } + } + if lblSet != nil { + streamLag.With(lblSet).Set(time.Now().Sub(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + } + } return } @@ -330,3 +367,8 @@ func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { }} return nil } + +func (c *client) UnregisterLatencyMetric(labels model.LabelSet) { + labels[HostLabel] = model.LabelValue(c.cfg.URL.Host) + streamLag.Delete(labels) +} diff --git a/pkg/promtail/client/config.go b/pkg/promtail/client/config.go index 444490acd1..590753e4c0 100644 --- a/pkg/promtail/client/config.go +++ b/pkg/promtail/client/config.go @@ -34,7 +34,7 @@ type Config struct { func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&c.URL, prefix+"client.url", "URL of log server") f.DurationVar(&c.BatchWait, prefix+"client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.") - f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ") + f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 1024*1024, "Maximum batch size to accrue before sending. ") // Default backoff schedule: 0.5s, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(4.267m) For a total time of 511.5s(8.5m) before logs are lost f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"client.max-retries", 10, "Maximum number of retires when sending batches.") f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"client.min-backoff", 500*time.Millisecond, "Initial backoff time between retries.") diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index 93783ba0a4..620704931f 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/targets/target" ) @@ -316,6 +317,9 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { t.positions.Remove(tailer.path) delete(t.tails, p) } + if h, ok := t.handler.(api.InstrumentedEntryHandler); ok { + h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)}) + } } }