From 01d63c8ae8bb139d6ed41eee95a2248ed7bc4738 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 15 Mar 2022 06:05:34 -0700 Subject: [PATCH] [promtail] Refactor promtail client metrics so that we can't have duplicate metrics collected for the lag metric. (#5521) * Refactor promtail client metrics so that we can't have duplicate metrics collected for the lag metric. Signed-off-by: Callum Styan * Set up new metrics passing for other client types. Signed-off-by: Callum Styan * Fix lint issues. Signed-off-by: Callum Styan * Update changelog and docs, also rename nested client configs struct to configs. Signed-off-by: Callum Styan Co-authored-by: Owen Diehl --- CHANGELOG.md | 1 + clients/cmd/docker-driver/loki.go | 3 +- clients/cmd/fluent-bit/buffer.go | 4 +- clients/cmd/fluent-bit/client.go | 7 +- clients/cmd/fluent-bit/dque.go | 5 +- clients/cmd/fluent-bit/loki.go | 4 +- clients/cmd/fluent-bit/out_grafana_loki.go | 7 +- clients/cmd/promtail/main.go | 4 +- clients/pkg/promtail/api/types.go | 3 +- clients/pkg/promtail/client/client.go | 96 +++++++++++-------- clients/pkg/promtail/client/client_test.go | 11 ++- clients/pkg/promtail/client/config.go | 19 ++-- clients/pkg/promtail/client/config_test.go | 14 ++- clients/pkg/promtail/client/logger.go | 5 +- clients/pkg/promtail/client/logger_test.go | 4 +- clients/pkg/promtail/client/multi.go | 5 +- clients/pkg/promtail/client/multi_test.go | 13 ++- clients/pkg/promtail/config/config.go | 8 +- clients/pkg/promtail/config/config_test.go | 81 +++++++++------- clients/pkg/promtail/promtail.go | 10 +- clients/pkg/promtail/promtail_test.go | 11 ++- .../pkg/promtail/targets/file/filetarget.go | 3 +- .../targets/lokipush/pushtarget_test.go | 3 +- .../sources/clients/promtail/configuration.md | 25 +++-- 24 files changed, 200 insertions(+), 146 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8198d3e190..b59c04f674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Main +* [5521](https://github.com/grafana/loki/pull/5521) **cstyan**: Move stream lag configuration to top level clients config struct and refactor stream lag metric, this resolves a bug with duplicate metric collection when a single Promtail binary is running multiple Promtail clients. * [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing` * [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel` * [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki. diff --git a/clients/cmd/docker-driver/loki.go b/clients/cmd/docker-driver/loki.go index 9659ae32ad..99d6ab8e4e 100644 --- a/clients/cmd/docker-driver/loki.go +++ b/clients/cmd/docker-driver/loki.go @@ -38,7 +38,8 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) { if err != nil { return nil, err } - c, err := client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) + m := client.NewMetrics(prometheus.DefaultRegisterer, nil) + c, err := client.New(m, cfg.clientConfig, nil, logger) if err != nil { return nil, err } diff --git a/clients/cmd/fluent-bit/buffer.go b/clients/cmd/fluent-bit/buffer.go index 5d8889b527..3f11ca4643 100644 --- a/clients/cmd/fluent-bit/buffer.go +++ b/clients/cmd/fluent-bit/buffer.go @@ -21,10 +21,10 @@ var defaultBufferConfig = bufferConfig{ } // NewBuffer makes a new buffered Client. -func NewBuffer(cfg *config, logger log.Logger) (client.Client, error) { +func NewBuffer(cfg *config, logger log.Logger, metrics *client.Metrics, streamLagLabels []string) (client.Client, error) { switch cfg.bufferConfig.bufferType { case "dque": - return newDque(cfg, logger) + return newDque(cfg, logger, metrics, streamLagLabels) default: return nil, fmt.Errorf("failed to parse bufferType: %s", cfg.bufferConfig.bufferType) } diff --git a/clients/cmd/fluent-bit/client.go b/clients/cmd/fluent-bit/client.go index 26d0b4bd21..2f28a2bb7f 100644 --- a/clients/cmd/fluent-bit/client.go +++ b/clients/cmd/fluent-bit/client.go @@ -2,15 +2,14 @@ package main import ( "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/clients/pkg/promtail/client" ) // NewClient creates a new client based on the fluentbit configuration. -func NewClient(cfg *config, logger log.Logger) (client.Client, error) { +func NewClient(cfg *config, logger log.Logger, metrics *client.Metrics, streamLagLabels []string) (client.Client, error) { if cfg.bufferConfig.buffer { - return NewBuffer(cfg, logger) + return NewBuffer(cfg, logger, metrics, streamLagLabels) } - return client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) + return client.New(metrics, cfg.clientConfig, streamLagLabels, logger) } diff --git a/clients/cmd/fluent-bit/dque.go b/clients/cmd/fluent-bit/dque.go index c2a4b0f7f2..00c19ebcd4 100644 --- a/clients/cmd/fluent-bit/dque.go +++ b/clients/cmd/fluent-bit/dque.go @@ -9,7 +9,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/joncrlsn/dque" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/loki/clients/pkg/promtail/api" @@ -52,7 +51,7 @@ type dqueClient struct { } // New makes a new dque loki client -func newDque(cfg *config, logger log.Logger) (client.Client, error) { +func newDque(cfg *config, logger log.Logger, metrics *client.Metrics, streamLagLabels []string) (client.Client, error) { var err error q := &dqueClient{ @@ -73,7 +72,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) { _ = q.queue.TurboOn() } - q.loki, err = client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger) + q.loki, err = client.New(metrics, cfg.clientConfig, streamLagLabels, logger) if err != nil { return nil, err } diff --git a/clients/cmd/fluent-bit/loki.go b/clients/cmd/fluent-bit/loki.go index d5f3b79835..1e30713609 100644 --- a/clients/cmd/fluent-bit/loki.go +++ b/clients/cmd/fluent-bit/loki.go @@ -36,8 +36,8 @@ type loki struct { logger log.Logger } -func newPlugin(cfg *config, logger log.Logger) (*loki, error) { - client, err := NewClient(cfg, logger) +func newPlugin(cfg *config, logger log.Logger, metrics *client.Metrics) (*loki, error) { + client, err := NewClient(cfg, logger, metrics, nil) if err != nil { return nil, err } diff --git a/clients/cmd/fluent-bit/out_grafana_loki.go b/clients/cmd/fluent-bit/out_grafana_loki.go index f07d536dfb..adb5e4cc84 100644 --- a/clients/cmd/fluent-bit/out_grafana_loki.go +++ b/clients/cmd/fluent-bit/out_grafana_loki.go @@ -15,6 +15,10 @@ import ( _ "github.com/grafana/loki/pkg/util/build" ) +import ( + "github.com/grafana/loki/clients/pkg/promtail/client" + "github.com/prometheus/client_golang/prometheus" +) var ( // registered loki plugin instances, required for disposal during shutdown @@ -83,7 +87,8 @@ func FLBPluginInit(ctx unsafe.Pointer) int { level.Info(paramLogger).Log("key_file", conf.clientConfig.Client.TLSConfig.KeyFile) level.Info(paramLogger).Log("insecure_skip_verify", conf.clientConfig.Client.TLSConfig.InsecureSkipVerify) - plugin, err := newPlugin(conf, logger) + m := client.NewMetrics(prometheus.DefaultRegisterer, nil) + plugin, err := newPlugin(conf, logger, m) if err != nil { level.Error(logger).Log("newPlugin", err) return output.FLB_ERROR diff --git a/clients/cmd/promtail/main.go b/clients/cmd/promtail/main.go index f75c9506f0..c14097dd0f 100644 --- a/clients/cmd/promtail/main.go +++ b/clients/cmd/promtail/main.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/promtail" + "github.com/grafana/loki/clients/pkg/promtail/client" "github.com/grafana/loki/clients/pkg/promtail/config" "github.com/grafana/loki/pkg/util" @@ -110,7 +111,8 @@ func main() { } } - p, err := promtail.New(config.Config, config.dryRun, prometheus.DefaultRegisterer) + clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.ClientConfigs.StreamLagLabels) + p, err := promtail.New(config.Config, clientMetrics, config.dryRun) if err != nil { level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err) os.Exit(1) diff --git a/clients/pkg/promtail/api/types.go b/clients/pkg/promtail/api/types.go index f1a4aa87b3..2bb2482da4 100644 --- a/clients/pkg/promtail/api/types.go +++ b/clients/pkg/promtail/api/types.go @@ -3,6 +3,7 @@ package api import ( "sync" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" @@ -16,7 +17,7 @@ type Entry struct { type InstrumentedEntryHandler interface { EntryHandler - UnregisterLatencyMetric(labels model.LabelSet) + UnregisterLatencyMetric(prometheus.Labels) } // EntryHandler is something that can "handle" entries via a channel. diff --git a/clients/pkg/promtail/client/client.go b/clients/pkg/promtail/client/client.go index 614a152983..7ace61e1d6 100644 --- a/clients/pkg/promtail/client/client.go +++ b/clients/pkg/promtail/client/client.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "crypto/sha256" "errors" "fmt" "io" @@ -20,7 +21,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" - "github.com/grafana/loki/clients/pkg/logentry/metric" "github.com/grafana/loki/clients/pkg/promtail/api" lokiutil "github.com/grafana/loki/pkg/util" @@ -37,11 +37,12 @@ const ( LatencyLabel = "filename" HostLabel = "host" + ClientLabel = "client" ) var UserAgent = fmt.Sprintf("promtail/%s", build.Version) -type metrics struct { +type Metrics struct { encodedBytes *prometheus.CounterVec sentBytes *prometheus.CounterVec droppedBytes *prometheus.CounterVec @@ -49,12 +50,12 @@ type metrics struct { droppedEntries *prometheus.CounterVec requestDuration *prometheus.HistogramVec batchRetries *prometheus.CounterVec - streamLag *metric.Gauges countersWithHost []*prometheus.CounterVec + streamLag *prometheus.GaugeVec } -func newMetrics(reg prometheus.Registerer) *metrics { - var m metrics +func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics { + var m Metrics m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", @@ -92,20 +93,18 @@ func newMetrics(reg prometheus.Registerer) *metrics { Help: "Number of times batches has had to be retried.", }, []string{HostLabel}) - var err error - m.streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", - "Difference between current time and last batch timestamp for successful sends", - metric.GaugeConfig{Action: "set"}, - int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. - ) - if err != nil { - panic(err) - } - m.countersWithHost = []*prometheus.CounterVec{ m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries, } + streamLagLabelsMerged := []string{HostLabel, ClientLabel} + streamLagLabelsMerged = append(streamLagLabelsMerged, streamLagLabels...) + m.streamLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "promtail", + Name: "stream_lag_seconds", + Help: "Difference between current time and last batch timestamp for successful sends", + }, streamLagLabelsMerged) + if reg != nil { m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) @@ -114,7 +113,7 @@ func newMetrics(reg prometheus.Registerer) *metrics { m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) - m.streamLag = mustRegisterOrGet(reg, m.streamLag).(*metric.Gauges) + m.streamLag = mustRegisterOrGet(reg, m.streamLag).(*prometheus.GaugeVec) } return &m @@ -139,11 +138,13 @@ type Client interface { // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { - metrics *metrics - logger log.Logger - cfg Config - client *http.Client - entries chan api.Entry + name string + metrics *Metrics + streamLagLabels []string + logger log.Logger + cfg Config + client *http.Client + entries chan api.Entry once sync.Once wg sync.WaitGroup @@ -159,11 +160,12 @@ type client struct { type Tripperware func(http.RoundTripper) http.RoundTripper // New makes a new Client. -func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) { - return newClient(reg, cfg, logger) +func New(metrics *Metrics, cfg Config, streamLagLabels []string, logger log.Logger) (Client, error) { + return newClient(metrics, cfg, streamLagLabels, logger) } -func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*client, error) { +func newClient(metrics *Metrics, cfg Config, streamLagLabels []string, logger log.Logger) (*client, error) { + if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } @@ -171,15 +173,20 @@ func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*clien ctx, cancel := context.WithCancel(context.Background()) c := &client{ - logger: log.With(logger, "component", "client", "host", cfg.URL.Host), - cfg: cfg, - entries: make(chan api.Entry), - metrics: newMetrics(reg), + logger: log.With(logger, "component", "client", "host", cfg.URL.Host), + cfg: cfg, + entries: make(chan api.Entry), + metrics: metrics, + streamLagLabels: streamLagLabels, + name: asSha256(cfg), externalLabels: cfg.ExternalLabels.LabelSet, ctx: ctx, cancel: cancel, } + if cfg.Name != "" { + c.name = cfg.Name + } err := cfg.Client.Validate() if err != nil { @@ -205,8 +212,8 @@ func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*clien } // NewWithTripperware creates a new Loki client with a custom tripperware. -func NewWithTripperware(reg prometheus.Registerer, cfg Config, logger log.Logger, tp Tripperware) (Client, error) { - c, err := newClient(reg, cfg, logger) +func NewWithTripperware(metrics *Metrics, cfg Config, streamLagLabels []string, logger log.Logger, tp Tripperware) (Client, error) { + c, err := newClient(metrics, cfg, streamLagLabels, logger) if err != nil { return nil, err } @@ -290,6 +297,14 @@ func (c *client) Chan() chan<- api.Entry { return c.entries } +func asSha256(o interface{}) string { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%v", o))) + + temp := fmt.Sprintf("%x", h.Sum(nil)) + return temp[:6] +} + func (c *client) sendBatch(tenantID string, batch *batch) { buf, entriesCount, err := batch.encode() if err != nil { @@ -318,23 +333,20 @@ func (c *client) sendBatch(tenantID string, batch *batch) { level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) return } - var lblSet model.LabelSet + lblSet := make(prometheus.Labels) for i := range lbls { - for _, lbl := range c.cfg.StreamLagLabels { + for _, lbl := range c.streamLagLabels { if lbls[i].Name == lbl { - if lblSet == nil { - lblSet = model.LabelSet{} - } - - lblSet = lblSet.Merge(model.LabelSet{ - model.LabelName(lbl): model.LabelValue(lbls[i].Value), - }) + lblSet[lbl] = lbls[i].Value } } } if lblSet != nil { // always set host - lblSet = lblSet.Merge(model.LabelSet{model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host)}) + lblSet[HostLabel] = c.cfg.URL.Host + // also set client name since if we have multiple promtail clients configured we will run into a + // duplicate metric collected with same labels error when trying to hit the /metrics endpoint + lblSet[ClientLabel] = c.name c.metrics.streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) } } @@ -434,7 +446,7 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) { return e, tenantID } -func (c *client) UnregisterLatencyMetric(labels model.LabelSet) { - labels[HostLabel] = model.LabelValue(c.cfg.URL.Host) +func (c *client) UnregisterLatencyMetric(labels prometheus.Labels) { + labels[HostLabel] = c.cfg.URL.Host c.metrics.streamLag.Delete(labels) } diff --git a/clients/pkg/promtail/client/client_test.go b/clients/pkg/promtail/client/client_test.go index aab6a536bc..0a80a76f8e 100644 --- a/clients/pkg/promtail/client/client_test.go +++ b/clients/pkg/promtail/client/client_test.go @@ -266,7 +266,8 @@ func TestClient_Handle(t *testing.T) { TenantID: testData.clientTenantID, } - c, err := New(reg, cfg, log.NewNopLogger()) + m := NewMetrics(reg, nil) + c, err := New(m, cfg, nil, log.NewNopLogger()) require.NoError(t, err) // Send all the input log entries @@ -397,8 +398,8 @@ func TestClient_StopNow(t *testing.T) { Timeout: 1 * time.Second, TenantID: c.clientTenantID, } - - cl, err := New(reg, cfg, log.NewNopLogger()) + m := NewMetrics(reg, nil) + cl, err := New(m, cfg, nil, log.NewNopLogger()) require.NoError(t, err) // Send all the input log entries @@ -472,9 +473,9 @@ func Test_Tripperware(t *testing.T) { url, err := url.Parse("http://foo.com") require.NoError(t, err) var called bool - c, err := NewWithTripperware(nil, Config{ + c, err := NewWithTripperware(metrics, Config{ URL: flagext.URLValue{URL: url}, - }, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper { + }, nil, log.NewNopLogger(), func(rt http.RoundTripper) http.RoundTripper { return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { require.Equal(t, r.URL.String(), "http://foo.com") called = true diff --git a/clients/pkg/promtail/client/config.go b/clients/pkg/promtail/client/config.go index bd38aa71cc..303914c4a7 100644 --- a/clients/pkg/promtail/client/config.go +++ b/clients/pkg/promtail/client/config.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" + dskit_flagext "github.com/grafana/dskit/flagext" "github.com/prometheus/common/config" lokiflag "github.com/grafana/loki/pkg/util/flagext" @@ -21,8 +22,14 @@ const ( Timeout = 10 * time.Second ) +type Configs struct { + StreamLagLabels dskit_flagext.StringSliceCSV `yaml:"stream_lag_labels,omitempty"` + Configs []Config `yaml:"configs"` +} + // Config describes configuration for a HTTP pusher client. type Config struct { + Name string `yaml:"name,omitempty"` URL flagext.URLValue BatchWait time.Duration BatchSize int @@ -37,8 +44,6 @@ type Config struct { // The tenant ID to use when pushing logs to Loki (empty string means // single tenant mode) TenantID string `yaml:"tenant_id"` - - StreamLagLabels flagext.StringSliceCSV `yaml:"stream_lag_labels"` } // RegisterFlags with prefix registers flags where every name is prefixed by @@ -55,9 +60,6 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&c.ExternalLabels, prefix+"client.external-labels", "list of external labels to add to each log (e.g: --client.external-labels=lb1=v1,lb2=v2)") f.StringVar(&c.TenantID, prefix+"client.tenant-id", "", "Tenant ID to use when pushing logs to Loki.") - - c.StreamLagLabels = []string{"filename"} - f.Var(&c.StreamLagLabels, prefix+"client.stream-lag-labels", "Comma-separated list of labels to use when calculating stream lag") } // RegisterFlags registers flags. @@ -80,10 +82,9 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { MaxRetries: MaxRetries, MinBackoff: MinBackoff, }, - BatchSize: BatchSize, - BatchWait: BatchWait, - Timeout: Timeout, - StreamLagLabels: []string{"filename"}, + BatchSize: BatchSize, + BatchWait: BatchWait, + Timeout: Timeout, } } diff --git a/clients/pkg/promtail/client/config_test.go b/clients/pkg/promtail/client/config_test.go index b237fdd9c3..f5ee8aad3c 100644 --- a/clients/pkg/promtail/client/config_test.go +++ b/clients/pkg/promtail/client/config_test.go @@ -48,10 +48,9 @@ func Test_Config(t *testing.T) { MaxRetries: MaxRetries, MinBackoff: MinBackoff, }, - BatchSize: BatchSize, - BatchWait: BatchWait, - Timeout: Timeout, - StreamLagLabels: []string{"filename"}, + BatchSize: BatchSize, + BatchWait: BatchWait, + Timeout: Timeout, }, }, { @@ -65,10 +64,9 @@ func Test_Config(t *testing.T) { MaxRetries: 20, MinBackoff: 5 * time.Second, }, - BatchSize: 100 * 2048, - BatchWait: 5 * time.Second, - Timeout: 5 * time.Second, - StreamLagLabels: []string{"filename"}, + BatchSize: 100 * 2048, + BatchWait: 5 * time.Second, + Timeout: 5 * time.Second, }, }, } diff --git a/clients/pkg/promtail/client/logger.go b/clients/pkg/promtail/client/logger.go index 6b723dc354..d8d2a7d277 100644 --- a/clients/pkg/promtail/client/logger.go +++ b/clients/pkg/promtail/client/logger.go @@ -9,7 +9,6 @@ import ( "github.com/fatih/color" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "gopkg.in/yaml.v2" "github.com/grafana/loki/clients/pkg/promtail/api" @@ -36,9 +35,9 @@ type logger struct { } // NewLogger creates a new client logger that logs entries instead of sending them. -func NewLogger(reg prometheus.Registerer, log log.Logger, cfgs ...Config) (Client, error) { +func NewLogger(metrics *Metrics, streamLogLabels []string, log log.Logger, cfgs ...Config) (Client, error) { // make sure the clients config is valid - c, err := NewMulti(reg, log, cfgs...) + c, err := NewMulti(metrics, streamLogLabels, log, cfgs...) if err != nil { return nil, err } diff --git a/clients/pkg/promtail/client/logger_test.go b/clients/pkg/promtail/client/logger_test.go index 469c04b580..122d2f80ef 100644 --- a/clients/pkg/promtail/client/logger_test.go +++ b/clients/pkg/promtail/client/logger_test.go @@ -16,10 +16,10 @@ import ( ) func TestNewLogger(t *testing.T) { - _, err := NewLogger(nil, util_log.Logger, []Config{}...) + _, err := NewLogger(nilMetrics, nil, util_log.Logger, []Config{}...) require.Error(t, err) - l, err := NewLogger(nil, util_log.Logger, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...) + l, err := NewLogger(nilMetrics, nil, util_log.Logger, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...) require.NoError(t, err) l.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Now(), Line: "entry"}} l.Stop() diff --git a/clients/pkg/promtail/client/multi.go b/clients/pkg/promtail/client/multi.go index 7c66f627b1..7e6754a44c 100644 --- a/clients/pkg/promtail/client/multi.go +++ b/clients/pkg/promtail/client/multi.go @@ -5,7 +5,6 @@ import ( "sync" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/clients/pkg/promtail/api" ) @@ -20,14 +19,14 @@ type MultiClient struct { } // NewMulti creates a new client -func NewMulti(reg prometheus.Registerer, logger log.Logger, cfgs ...Config) (Client, error) { +func NewMulti(metrics *Metrics, streamLagLabels []string, logger log.Logger, cfgs ...Config) (Client, error) { if len(cfgs) == 0 { return nil, errors.New("at least one client config should be provided") } clients := make([]Client, 0, len(cfgs)) for _, cfg := range cfgs { - client, err := New(reg, cfg, logger) + client, err := New(metrics, cfg, streamLagLabels, logger) if err != nil { return nil, err } diff --git a/clients/pkg/promtail/client/multi_test.go b/clients/pkg/promtail/client/multi_test.go index 7312831885..63dd6fe938 100644 --- a/clients/pkg/promtail/client/multi_test.go +++ b/clients/pkg/promtail/client/multi_test.go @@ -21,8 +21,13 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" ) +var ( + nilMetrics = NewMetrics(nil, nil) + metrics = NewMetrics(prometheus.DefaultRegisterer, nil) +) + func TestNewMulti(t *testing.T) { - _, err := NewMulti(nil, util_log.Logger, []Config{}...) + _, err := NewMulti(nilMetrics, nil, util_log.Logger, []Config{}...) if err == nil { t.Fatal("expected err but got nil") } @@ -41,7 +46,7 @@ func TestNewMulti(t *testing.T) { ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"hi": "there"}}, } - clients, err := NewMulti(prometheus.DefaultRegisterer, util_log.Logger, cc1, cc2) + clients, err := NewMulti(metrics, nil, util_log.Logger, cc1, cc2) if err != nil { t.Fatalf("expected err: nil got:%v", err) } @@ -104,9 +109,9 @@ func TestMultiClient_Handle(t *testing.T) { func TestMultiClient_Handle_Race(t *testing.T) { u := flagext.URLValue{} require.NoError(t, u.Set("http://localhost")) - c1, err := New(nil, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger()) + c1, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, nil, log.NewNopLogger()) require.NoError(t, err) - c2, err := New(nil, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger()) + c2, err := New(nilMetrics, Config{URL: u, BackoffConfig: backoff.Config{MaxRetries: 1}, Timeout: time.Microsecond}, nil, log.NewNopLogger()) require.NoError(t, err) clients := []Client{c1, c2} m := &MultiClient{ diff --git a/clients/pkg/promtail/config/config.go b/clients/pkg/promtail/config/config.go index 543185fa68..ac5e5828b2 100644 --- a/clients/pkg/promtail/config/config.go +++ b/clients/pkg/promtail/config/config.go @@ -21,7 +21,7 @@ type Config struct { ServerConfig server.Config `yaml:"server,omitempty"` // deprecated use ClientConfigs instead ClientConfig client.Config `yaml:"client,omitempty"` - ClientConfigs []client.Config `yaml:"clients,omitempty"` + ClientConfigs client.Configs `yaml:"clients,omitempty"` PositionsConfig positions.Config `yaml:"positions,omitempty"` ScrapeConfig []scrapeconfig.Config `yaml:"scrape_configs,omitempty"` TargetConfig file.Config `yaml:"target_config,omitempty"` @@ -54,7 +54,7 @@ func (c Config) String() string { func (c *Config) Setup() { if c.ClientConfig.URL.URL != nil { // if a single client config is used we add it to the multiple client config for backward compatibility - c.ClientConfigs = append(c.ClientConfigs, c.ClientConfig) + c.ClientConfigs.Configs = append(c.ClientConfigs.Configs, c.ClientConfig) } // This is a bit crude but if the Loki Push API target is specified, @@ -73,8 +73,8 @@ func (c *Config) Setup() { // not typically the order of precedence, the assumption here is someone providing a specific config in // yaml is doing so explicitly to make a key specific to a client. if len(c.ClientConfig.ExternalLabels.LabelSet) > 0 { - for i := range c.ClientConfigs { - c.ClientConfigs[i].ExternalLabels = flagext.LabelSet{LabelSet: c.ClientConfig.ExternalLabels.LabelSet.Merge(c.ClientConfigs[i].ExternalLabels.LabelSet)} + for i := range c.ClientConfigs.Configs { + c.ClientConfigs.Configs[i].ExternalLabels = flagext.LabelSet{LabelSet: c.ClientConfig.ExternalLabels.LabelSet.Merge(c.ClientConfigs.Configs[i].ExternalLabels.LabelSet)} } } } diff --git a/clients/pkg/promtail/config/config_test.go b/clients/pkg/promtail/config/config_test.go index 6c02f38747..26103931d1 100644 --- a/clients/pkg/promtail/config/config_test.go +++ b/clients/pkg/promtail/config/config_test.go @@ -17,12 +17,13 @@ import ( const testFile = ` clients: - - external_labels: - cluster: dev1 - url: https://1:shh@example.com/loki/api/v1/push - - external_labels: - cluster: prod1 - url: https://1:shh@example.com/loki/api/v1/push + clients: + - external_labels: + cluster: dev1 + url: https://1:shh@example.com/loki/api/v1/push + - external_labels: + cluster: prod1 + url: https://1:shh@example.com/loki/api/v1/push scrape_configs: - job_name: kubernetes-pods-name kubernetes_sd_configs: @@ -63,12 +64,15 @@ func TestConfig_Setup(t *testing.T) { ClientConfig: client.Config{ ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, }, - ClientConfigs: []client.Config{ - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1"}}, - }, - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2"}}, + ClientConfigs: client.Configs{ + StreamLagLabels: []string{}, + Configs: []client.Config{ + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1"}}, + }, + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2"}}, + }, }, }, }, @@ -76,12 +80,15 @@ func TestConfig_Setup(t *testing.T) { ClientConfig: client.Config{ ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, }, - ClientConfigs: []client.Config{ - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1", "foo": "bar"}}, - }, - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2", "foo": "bar"}}, + ClientConfigs: client.Configs{ + StreamLagLabels: []string{}, + Configs: []client.Config{ + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1", "foo": "bar"}}, + }, + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2", "foo": "bar"}}, + }, }, }, }, @@ -92,12 +99,15 @@ func TestConfig_Setup(t *testing.T) { ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, URL: dskitflagext.URLValue{URL: mustURL("http://foo")}, }, - ClientConfigs: []client.Config{ - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1"}}, - }, - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2"}}, + ClientConfigs: client.Configs{ + StreamLagLabels: []string{}, + Configs: []client.Config{ + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1"}}, + }, + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2"}}, + }, }, }, }, @@ -106,16 +116,19 @@ func TestConfig_Setup(t *testing.T) { ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, URL: dskitflagext.URLValue{URL: mustURL("http://foo")}, }, - ClientConfigs: []client.Config{ - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1", "foo": "bar"}}, - }, - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2", "foo": "bar"}}, - }, - { - ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, - URL: dskitflagext.URLValue{URL: mustURL("http://foo")}, + ClientConfigs: client.Configs{ + StreamLagLabels: []string{}, + Configs: []client.Config{ + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client1": "1", "foo": "bar"}}, + }, + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"client2": "2", "foo": "bar"}}, + }, + { + ExternalLabels: flagext.LabelSet{LabelSet: model.LabelSet{"foo": "bar"}}, + URL: dskitflagext.URLValue{URL: mustURL("http://foo")}, + }, }, }, }, diff --git a/clients/pkg/promtail/promtail.go b/clients/pkg/promtail/promtail.go index 71ba270b9f..f2181ccafa 100644 --- a/clients/pkg/promtail/promtail.go +++ b/clients/pkg/promtail/promtail.go @@ -46,7 +46,7 @@ type Promtail struct { } // New makes a new Promtail. -func New(cfg config.Config, dryRun bool, reg prometheus.Registerer, opts ...Option) (*Promtail, error) { +func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) { // Initialize promtail with some defaults and allow the options to override // them. promtail := &Promtail{ @@ -54,6 +54,10 @@ func New(cfg config.Config, dryRun bool, reg prometheus.Registerer, opts ...Opti reg: prometheus.DefaultRegisterer, } for _, o := range opts { + // todo (callum) I don't understand why I needed to add this check + if o == nil { + continue + } o(promtail) } @@ -64,13 +68,13 @@ func New(cfg config.Config, dryRun bool, reg prometheus.Registerer, opts ...Opti } var err error if dryRun { - promtail.client, err = client.NewLogger(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfigs...) + promtail.client, err = client.NewLogger(metrics, cfg.ClientConfigs.StreamLagLabels, promtail.logger, cfg.ClientConfigs.Configs...) if err != nil { return nil, err } cfg.PositionsConfig.ReadOnly = true } else { - promtail.client, err = client.NewMulti(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfigs...) + promtail.client, err = client.NewMulti(metrics, cfg.ClientConfigs.StreamLagLabels, promtail.logger, cfg.ClientConfigs.Configs...) if err != nil { return nil, err } diff --git a/clients/pkg/promtail/promtail_test.go b/clients/pkg/promtail/promtail_test.go index 109c0d6721..2f5eb746db 100644 --- a/clients/pkg/promtail/promtail_test.go +++ b/clients/pkg/promtail/promtail_test.go @@ -45,6 +45,8 @@ import ( const httpTestPort = 9080 +var clientMetrics = client.NewMetrics(prometheus.DefaultRegisterer, nil) + func TestPromtail(t *testing.T) { // Setup. w := log.NewSyncWriter(os.Stderr) @@ -101,9 +103,8 @@ func TestPromtail(t *testing.T) { defer func() { _ = server.Shutdown(context.Background()) }() - // Run. - p, err := New(buildTestConfig(t, positionsFileName, testDir), false, nil) + p, err := New(buildTestConfig(t, positionsFileName, testDir), clientMetrics, false, nil) if err != nil { t.Error("error creating promtail", err) return @@ -647,7 +648,7 @@ func Test_DryRun(t *testing.T) { require.NoError(t, err) defer os.Remove(f.Name()) - _, err = New(config.Config{}, true, nil) + _, err = New(config.Config{}, clientMetrics, true, nil) require.Error(t, err) // Set the minimum config needed to start a server. We need to do this since we @@ -669,7 +670,7 @@ func Test_DryRun(t *testing.T) { PositionsFile: f.Name(), SyncPeriod: time.Second, }, - }, true, nil) + }, clientMetrics, true, nil) require.NoError(t, err) prometheus.DefaultRegisterer = prometheus.NewRegistry() @@ -681,7 +682,7 @@ func Test_DryRun(t *testing.T) { PositionsFile: f.Name(), SyncPeriod: time.Second, }, - }, false, nil) + }, clientMetrics, false, nil) require.NoError(t, err) require.IsType(t, &client.MultiClient{}, p.client) } diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 9096b50b22..8c69f081dd 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" fsnotify "gopkg.in/fsnotify.v1" @@ -309,7 +310,7 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { delete(t.tails, p) } if h, ok := t.handler.(api.InstrumentedEntryHandler); ok { - h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)}) + h.UnregisterLatencyMetric(prometheus.Labels{client.LatencyLabel: p}) } } } diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget_test.go b/clients/pkg/promtail/targets/lokipush/pushtarget_test.go index 6181b5dd8a..76bf7779b9 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget_test.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget_test.go @@ -84,7 +84,8 @@ func TestLokiPushTarget(t *testing.T) { BatchWait: 1 * time.Second, BatchSize: 100 * 1024, } - pc, err := client.New(prometheus.DefaultRegisterer, ccfg, logger) + m := client.NewMetrics(prometheus.DefaultRegisterer, nil) + pc, err := client.New(m, ccfg, nil, logger) require.NoError(t, err) defer pc.Stop() diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index 9a6a7bdfd3..4e5dc9dbd7 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -156,9 +156,26 @@ The `server` block configures Promtail's behavior as an HTTP server: ## clients -The `clients` block configures how Promtail connects to an instance of +The `clients` block configures how Promtail connects to instances of Loki: +```yaml +# A comma-separated list of labels to include in the stream lag metric `promtail_stream_lag_seconds`. +# The default value is "filename". A "host" label is always included. +# The stream lag metric indicates which streams are falling behind on writes to Loki; +# be mindful about using too many labels, as it can increase cardinality. +[stream_lag_labels: | default = "filename"] + +configs: + - +``` + +### client + +The `client` block configures how an individual Promtail client connects +to instances of Loki: + + ```yaml # The URL where Loki is listening, denoted in Loki as http_listen_address and # http_listen_port. If Loki is running in microservices mode, this is the HTTP @@ -273,12 +290,6 @@ external_labels: # Maximum time to wait for a server to respond to a request [timeout: | default = 10s] - -# A comma-separated list of labels to include in the stream lag metric `promtail_stream_lag_seconds`. -# The default value is "filename". A "host" label is always included. -# The stream lag metric indicates which streams are falling behind on writes to Loki; -# be mindful about using too many labels, as it can increase cardinality. -[stream_lag_labels: | default = "filename"] ``` ## positions