Send logs to multiple loki instances (#536)

* Adds the ability to provide multiple Loki URL

For backward compatibility `client:` still works with flag.

* add some tests for multi client

* update ksonnet module to support multiple client

* fix comment

* fix lint issues
pull/554/head
Cyril Tovena 6 years ago committed by GitHub
parent fa4f936fcb
commit 53075db577
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      cmd/promtail/promtail-docker-config.yaml
  2. 4
      cmd/promtail/promtail-local-config.yaml
  3. 67
      pkg/promtail/client/client.go
  4. 57
      pkg/promtail/client/config.go
  5. 24
      pkg/promtail/client/fake/client.go
  6. 43
      pkg/promtail/client/multi.go
  7. 103
      pkg/promtail/client/multi_test.go
  8. 4
      pkg/promtail/config/config.go
  9. 9
      pkg/promtail/promtail.go
  10. 48
      pkg/util/errors.go
  11. 16
      production/ksonnet/README.md
  12. 18
      production/ksonnet/promtail/config.libsonnet
  13. 13
      production/ksonnet/promtail/promtail.libsonnet

@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml
client:
url: http://loki:3100/api/prom/push
clients:
- url: http://loki:3100/api/prom/push
scrape_configs:
- job_name: system

@ -5,8 +5,8 @@ server:
positions:
filename: /tmp/positions.yaml
client:
url: http://localhost:3100/api/prom/push
clients:
- url: http://localhost:3100/api/prom/push
scrape_configs:
- job_name: system

@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
@ -12,8 +11,9 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
@ -29,21 +29,21 @@ const contentType = "application/x-protobuf"
const maxErrMsgLen = 1024
var (
encodedBytes = prometheus.NewCounter(prometheus.CounterOpts{
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
})
sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"host"})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
})
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code"})
}, []string{"status_code", "host"})
)
func init() {
@ -52,32 +52,15 @@ func init() {
prometheus.MustRegister(requestDuration)
}
// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int
BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}
// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")
// Client pushes entries to Loki and can be stopped
type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()
}
// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
type client struct {
logger log.Logger
cfg Config
quit chan struct{}
@ -93,9 +76,9 @@ type entry struct {
}
// New makes a new Client.
func New(cfg Config, logger log.Logger) (*Client, error) {
c := &Client{
logger: logger,
func New(cfg Config, logger log.Logger) Client {
c := &client{
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
cfg: cfg,
quit: make(chan struct{}),
entries: make(chan entry),
@ -104,10 +87,10 @@ func New(cfg Config, logger log.Logger) (*Client, error) {
}
c.wg.Add(1)
go c.run()
return c, nil
return c
}
func (c *Client) run() {
func (c *client) run() {
batch := map[model.Fingerprint]*logproto.Stream{}
batchSize := 0
maxWait := time.NewTimer(c.cfg.BatchWait)
@ -151,14 +134,14 @@ func (c *Client) run() {
}
}
func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
}
bufBytes := float64(len(buf))
encodedBytes.Add(bufBytes)
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
ctx := context.Background()
backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
@ -166,10 +149,10 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, buf)
requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
if err == nil {
sentBytes.Add(bufBytes)
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
return
}
@ -202,7 +185,7 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
return buf, nil
}
func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
func (c *client) send(ctx context.Context, buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
@ -230,13 +213,13 @@ func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
}
// Stop the client.
func (c *Client) Stop() {
func (c *client) Stop() {
close(c.quit)
c.wg.Wait()
}
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error {
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
ls = c.externalLabels.Merge(ls)
}

@ -0,0 +1,57 @@
package client
import (
"flag"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
)
// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int
BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
// The labels to add to any time series or alerts when communicating with loki
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}
// RegisterFlags registers flags.
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")
}
// UnmarshalYAML implement Yaml Unmarshaler
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
type raw Config
// force sane defaults.
cfg := raw{
BackoffConfig: util.BackoffConfig{
MaxBackoff: 5 * time.Second,
MaxRetries: 5,
MinBackoff: 100 * time.Millisecond,
},
BatchSize: 100 * 1024,
BatchWait: 1 * time.Second,
Timeout: 10 * time.Second,
}
if err := unmarshal(&cfg); err != nil {
return err
}
*c = Config(cfg)
return nil
}

@ -0,0 +1,24 @@
package fake
import (
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
)
// Client is a fake client used for testing.
type Client struct {
OnHandleEntry api.EntryHandlerFunc
OnStop func()
}
// Stop implements client.Client
func (c *Client) Stop() {
c.OnStop()
}
// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
}

@ -0,0 +1,43 @@
package client
import (
"errors"
"time"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
)
// MultiClient is client pushing to one or more loki instances.
type MultiClient []Client
// NewMulti creates a new client
func NewMulti(logger log.Logger, cfgs ...Config) (Client, error) {
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
var clients []Client
for _, cfg := range cfgs {
clients = append(clients, New(cfg, logger))
}
return MultiClient(clients), nil
}
// Handle Implements api.EntryHandler
func (m MultiClient) Handle(labels model.LabelSet, time time.Time, entry string) error {
var result util.MultiError
for _, client := range m {
if err := client.Handle(labels, time, entry); err != nil {
result.Add(err)
}
}
return result.Err()
}
// Stop implements Client
func (m MultiClient) Stop() {
for _, c := range m {
c.Stop()
}
}

@ -0,0 +1,103 @@
package client
import (
"errors"
"net/url"
"reflect"
"testing"
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/promtail/client/fake"
"github.com/prometheus/common/model"
)
func TestNewMulti(t *testing.T) {
_, err := NewMulti(util.Logger, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
host1, _ := url.Parse("http://localhost:3100")
host2, _ := url.Parse("https://grafana.com")
expectedCfg1 := Config{BatchSize: 20, URL: flagext.URLValue{URL: host1}}
expectedCfg2 := Config{BatchSize: 10, URL: flagext.URLValue{URL: host2}}
clients, err := NewMulti(util.Logger, expectedCfg1, expectedCfg2)
if err != nil {
t.Fatalf("expected err: nil got:%v", err)
}
multi := clients.(MultiClient)
if len(multi) != 2 {
t.Fatalf("expected client: 2 got:%d", len(multi))
}
cfg1 := clients.(MultiClient)[0].(*client).cfg
if !reflect.DeepEqual(cfg1, expectedCfg1) {
t.Fatalf("expected cfg: %v got:%v", expectedCfg1, cfg1)
}
cfg2 := clients.(MultiClient)[1].(*client).cfg
if !reflect.DeepEqual(cfg2, expectedCfg2) {
t.Fatalf("expected cfg: %v got:%v", expectedCfg2, cfg2)
}
}
func TestMultiClient_Stop(t *testing.T) {
var stopped int
stopping := func() {
stopped++
}
fc := &fake.Client{OnStop: stopping}
clients := []Client{fc, fc, fc, fc}
m := MultiClient(clients)
m.Stop()
if stopped != len(clients) {
t.Fatal("missing stop call")
}
}
func TestMultiClient_Handle(t *testing.T) {
var called int
errorFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return errors.New("") })
okFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return nil })
errfc := &fake.Client{OnHandleEntry: errorFn}
okfc := &fake.Client{OnHandleEntry: okFn}
t.Run("some error", func(t *testing.T) {
clients := []Client{okfc, errfc, okfc, errfc, errfc, okfc}
m := MultiClient(clients)
if err := m.Handle(nil, time.Now(), ""); err == nil {
t.Fatal("expected err got nil")
}
if called != len(clients) {
t.Fatal("missing handle call")
}
})
t.Run("no error", func(t *testing.T) {
called = 0
clients := []Client{okfc, okfc, okfc, okfc, okfc, okfc}
m := MultiClient(clients)
if err := m.Handle(nil, time.Now(), ""); err != nil {
t.Fatal("expected err to be nil")
}
if called != len(clients) {
t.Fatal("missing handle call")
}
})
}

@ -12,8 +12,10 @@ import (
// Config for promtail, describing what files to watch.
type Config struct {
ServerConfig server.Config `yaml:"server,omitempty"`
ServerConfig server.Config `yaml:"server,omitempty"`
// deprecated use ClientConfigs instead
ClientConfig client.Config `yaml:"client,omitempty"`
ClientConfigs []client.Config `yaml:"clients,omitempty"`
PositionsConfig positions.Config `yaml:"positions,omitempty"`
ScrapeConfig []scrape.Config `yaml:"scrape_configs,omitempty"`
TargetConfig targets.Config `yaml:"target_config,omitempty"`

@ -12,7 +12,7 @@ import (
// Promtail is the root struct for Promtail...
type Promtail struct {
client *client.Client
client client.Client
positions *positions.Positions
targetManagers *targets.TargetManagers
server *server.Server
@ -25,7 +25,12 @@ func New(cfg config.Config) (*Promtail, error) {
return nil, err
}
client, err := client.New(cfg.ClientConfig, util.Logger)
if cfg.ClientConfig.URL.URL != nil {
// if a single client config is used we add it to the multiple client config for backward compatibility
cfg.ClientConfigs = append(cfg.ClientConfigs, cfg.ClientConfig)
}
client, err := client.NewMulti(util.Logger, cfg.ClientConfigs...)
if err != nil {
return nil, err
}

@ -0,0 +1,48 @@
package util
import (
"bytes"
"fmt"
)
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
_, _ = fmt.Fprintf(&buf, "%d errors: ", len(es))
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
}
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
}
}
// Err returns the error list as an error or nil if it is empty.
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}

@ -40,15 +40,21 @@ promtail + {
_config+:: {
namespace: 'loki',
promtail_config: {
scheme: 'https',
hostname: 'logs-us-west1.grafana.net',
username: 'user-id',
password: 'password',
promtail_config+: {
clients: [
{
scheme:: 'https',
hostname:: 'logs-us-west1.grafana.net',
username:: 'user-id',
password:: 'password',
external_labels: {},
}
],
container_root_path: '/var/lib/docker',
},
},
}
```
Notice that `container_root_path` is your own data root for docker daemon, use `docker info | grep "Root Dir"` to get it.

@ -6,19 +6,15 @@
_config+:: {
prometheus_insecure_skip_verify: false,
promtail_config: {
username: '',
password: '',
scheme: 'https',
hostname: 'logs-us-west1.grafana.net',
clients:[{
username:: '',
password:: '',
scheme:: 'https',
hostname:: 'logs-us-west1.grafana.net',
external_labels: {},
}],
container_root_path: '/var/lib/docker',
external_labels: {},
entry_parser: 'docker',
},
service_url:
if std.objectHas(self.promtail_config, 'username') then
'%(scheme)s://%(username)s:%(password)s@%(hostname)s/api/prom/push' % self.promtail_config
else
'%(scheme)s://%(hostname)s/api/prom/push' % self.promtail_config,
},
}

@ -17,9 +17,17 @@ k + config + scrape_config {
]),
promtail_config+:: {
client: {
external_labels: $._config.promtail_config.external_labels,
local service_url(client) =
if std.objectHasAll(client, 'username') then
'%(scheme)s://%(username)s:%(password)s@%(hostname)s/api/prom/push' % client
else
'%(scheme)s://%(hostname)s/api/prom/push' % client,
local client_config(client) = client + {
url: service_url(client),
},
clients: std.map(client_config,$._config.promtail_config.clients)
},
local configMap = $.core.v1.configMap,
@ -31,7 +39,6 @@ k + config + scrape_config {
}),
promtail_args:: {
'client.url': $._config.service_url,
'config.file': '/etc/promtail/promtail.yml',
},

Loading…
Cancel
Save