diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f0b3c66b4..105851a23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ ## Main +* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail. +* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped. * [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`. * [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager. * [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines. * [5144](https://github.com/grafana/loki/pull/5144) **dannykopping** Ruler: fix remote write basic auth credentials. -* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped. * [5091](https://github.com/grafana/loki/pull/5091) **owen-d**: Changes `ingester.concurrent-flushes` default to 32 * [4879](https://github.com/grafana/loki/pull/4879) **cyriltovena**: LogQL: add __line__ function to | line_format template. * [5081](https://github.com/grafana/loki/pull/5081) **SasSwart**: Add the option to configure memory ballast for Loki diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index 44b24d436b..738723ec5c 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -33,18 +33,20 @@ import ( // Config describes a job to scrape. type Config struct { - JobName string `yaml:"job_name,omitempty"` - PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"` - JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"` - SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"` - GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"` - PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"` - WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"` - KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"` - GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"` - CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"` - RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` - ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"` + JobName string `yaml:"job_name,omitempty"` + PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"` + JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"` + SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"` + GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"` + PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"` + WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"` + KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"` + GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"` + CloudflareConfig *CloudflareConfig `yaml:"cloudflare,omitempty"` + RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` + // List of Docker service discovery configurations. + DockerSDConfigs []*moby.DockerSDConfig `yaml:"docker_sd_configs,omitempty"` + ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"` } type ServiceDiscoveryConfig struct { diff --git a/clients/pkg/promtail/targets/docker/metrics.go b/clients/pkg/promtail/targets/docker/metrics.go new file mode 100644 index 0000000000..9d57493e5e --- /dev/null +++ b/clients/pkg/promtail/targets/docker/metrics.go @@ -0,0 +1,38 @@ +package docker + +import "github.com/prometheus/client_golang/prometheus" + +// Metrics holds a set of Docker target metrics. +type Metrics struct { + reg prometheus.Registerer + + dockerEntries prometheus.Counter + dockerErrors prometheus.Counter +} + +// NewMetrics creates a new set of Docker target metrics. If reg is non-nil, the +// metrics will be registered. +func NewMetrics(reg prometheus.Registerer) *Metrics { + var m Metrics + m.reg = reg + + m.dockerEntries = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "docker_target_entries_total", + Help: "Total number of successful entries sent to the Docker target", + }) + m.dockerErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "docker_target_parsing_errors_total", + Help: "Total number of parsing errors while receiving Docker messages", + }) + + if reg != nil { + reg.MustRegister( + m.dockerEntries, + m.dockerErrors, + ) + } + + return &m +} diff --git a/clients/pkg/promtail/targets/docker/target.go b/clients/pkg/promtail/targets/docker/target.go new file mode 100644 index 0000000000..a9f41a92b0 --- /dev/null +++ b/clients/pkg/promtail/targets/docker/target.go @@ -0,0 +1,231 @@ +package docker + +import ( + "bufio" + "context" + "fmt" + "io" + "strconv" + "strings" + "sync" + "time" + + docker_types "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + "go.uber.org/atomic" + + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/positions" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" + + "github.com/grafana/loki/pkg/logproto" +) + +type Target struct { + logger log.Logger + handler api.EntryHandler + since int64 + positions positions.Positions + containerName string + labels model.LabelSet + relabelConfig []*relabel.Config + metrics *Metrics + + cancel context.CancelFunc + client client.APIClient + wg sync.WaitGroup + running *atomic.Bool + err error +} + +func NewTarget( + metrics *Metrics, + logger log.Logger, + handler api.EntryHandler, + position positions.Positions, + containerName string, + labels model.LabelSet, + relabelConfig []*relabel.Config, + client client.APIClient, +) (*Target, error) { + + pos, err := position.Get(positions.CursorKey(containerName)) + if err != nil { + return nil, err + } + var since int64 + if pos != 0 { + since = pos + } + + ctx, cancel := context.WithCancel(context.Background()) + t := &Target{ + logger: logger, + handler: handler, + since: since, + positions: position, + containerName: containerName, + labels: labels, + relabelConfig: relabelConfig, + metrics: metrics, + + cancel: cancel, + client: client, + running: atomic.NewBool(false), + } + go t.processLoop(ctx) + return t, nil +} + +func (t *Target) processLoop(ctx context.Context) { + t.wg.Add(1) + defer t.wg.Done() + t.running.Store(true) + + opts := docker_types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: true, + Since: strconv.FormatInt(t.since, 10), + } + + logs, err := t.client.ContainerLogs(ctx, t.containerName, opts) + if err != nil { + level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err) + t.err = err + return + } + + // Start transferring + rstdout, wstdout := io.Pipe() + rstderr, wstderr := io.Pipe() + t.wg.Add(1) + go func() { + defer func() { + t.wg.Done() + wstdout.Close() + wstderr.Close() + t.Stop() + }() + + written, err := stdcopy.StdCopy(wstdout, wstderr, logs) + if err != nil { + level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err) + } else { + level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName) + } + }() + + // Start processing + t.wg.Add(2) + go t.process(rstdout, "stdout") + go t.process(rstderr, "stderr") + + // Wait until done + <-ctx.Done() + t.running.Store(false) + logs.Close() + level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName) +} + +// extractTs tries for read the timestamp from the beginning of the log line. +// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00. +func extractTs(line string) (time.Time, string, error) { + pair := strings.SplitN(line, " ", 2) + if len(pair) != 2 { + return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line) + } + ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0]) + if err != nil { + return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err) + } + return ts, pair[1], nil +} + +func (t *Target) process(r io.Reader, logStream string) { + defer func() { + t.wg.Done() + }() + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + line := scanner.Text() + ts, line, err := extractTs(line) + if err != nil { + level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) + t.metrics.dockerErrors.Inc() + continue + } + + // Add all labels from the config, relabel and filter them. + lb := labels.NewBuilder(nil) + for k, v := range t.labels { + lb.Set(string(k), string(v)) + } + lb.Set(dockerLabelLogStream, logStream) + processed := relabel.Process(lb.Labels(), t.relabelConfig...) + + filtered := make(model.LabelSet) + for _, lbl := range processed { + if strings.HasPrefix(lbl.Name, "__") { + continue + } + filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + t.handler.Chan() <- api.Entry{ + Labels: filtered, + Entry: logproto.Entry{ + Timestamp: ts, + Line: line, + }, + } + t.metrics.dockerEntries.Inc() + t.positions.Put(positions.CursorKey(t.containerName), ts.Unix()) + } + + err := scanner.Err() + if err != nil { + level.Warn(t.logger).Log("msg", "finished scanning logs lines with an error", "err", err) + } + +} + +func (t *Target) Stop() { + t.cancel() + t.wg.Wait() + level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName) +} + +func (t *Target) Type() target.TargetType { + return target.DockerTargetType +} + +func (t *Target) Ready() bool { + return t.running.Load() +} + +func (t *Target) DiscoveredLabels() model.LabelSet { + return t.labels +} + +func (t *Target) Labels() model.LabelSet { + return t.labels +} + +// Details returns target-specific details. +func (t *Target) Details() interface{} { + return map[string]string{ + "id": t.containerName, + "error": t.err.Error(), + "position": t.positions.GetString(positions.CursorKey(t.containerName)), + "running": strconv.FormatBool(t.running.Load()), + } +} diff --git a/clients/pkg/promtail/targets/docker/target_group.go b/clients/pkg/promtail/targets/docker/target_group.go new file mode 100644 index 0000000000..c9f45e95d4 --- /dev/null +++ b/clients/pkg/promtail/targets/docker/target_group.go @@ -0,0 +1,144 @@ +package docker + +import ( + "fmt" + "sync" + + "github.com/docker/docker/client" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/positions" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/relabel" +) + +const DockerSource = "Docker" + +// targetGroup manages all container targets of one Docker daemon. +type targetGroup struct { + metrics *Metrics + logger log.Logger + positions positions.Positions + entryHandler api.EntryHandler + defaultLabels model.LabelSet + relabelConfig []*relabel.Config + host string + client client.APIClient + + mtx sync.Mutex + targets map[string]*Target +} + +func (tg *targetGroup) sync(groups []*targetgroup.Group) { + tg.mtx.Lock() + defer tg.mtx.Unlock() + + for _, group := range groups { + if group.Source != DockerSource { + continue + } + + for _, t := range group.Targets { + containerID, ok := t[dockerLabelContainerID] + if !ok { + level.Debug(tg.logger).Log("msg", "Docker target did not include container ID") + continue + } + + err := tg.addTarget(string(containerID), t) + if err != nil { + level.Error(tg.logger).Log("msg", "could not add target", "containerID", containerID, "err", err) + } + } + } +} + +// addTarget checks whether the container with given id is already known. If not it's added to the this group +func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) error { + if tg.client == nil { + var err error + opts := []client.Opt{ + client.WithHost(tg.host), + client.WithAPIVersionNegotiation(), + } + tg.client, err = client.NewClientWithOpts(opts...) + if err != nil { + level.Error(tg.logger).Log("msg", "could not create new Docker client", "err", err) + return err + } + } + + _, ok := tg.targets[id] + if ok { + level.Debug(tg.logger).Log("msg", "ignoring container that is already being scraped", "container", id) + return nil + } + + t, err := NewTarget( + tg.metrics, + log.With(tg.logger, "target", fmt.Sprintf("docker/%s", id)), + tg.entryHandler, + tg.positions, + id, + discoveredLabels.Merge(tg.defaultLabels), + tg.relabelConfig, + tg.client, + ) + if err != nil { + return err + } + tg.targets[id] = t + level.Error(tg.logger).Log("msg", "added Docker target", "containerID", id) + return nil +} + +// Ready returns true if at least one target is running. +func (tg *targetGroup) Ready() bool { + tg.mtx.Lock() + defer tg.mtx.Unlock() + + for _, t := range tg.targets { + if t.Ready() { + return true + } + } + + return true +} + +// Stop all targets +func (tg *targetGroup) Stop() { + tg.mtx.Lock() + defer tg.mtx.Unlock() + + for _, t := range tg.targets { + t.Stop() + } + tg.entryHandler.Stop() +} + +// ActiveTargets return all targets that are ready. +func (tg *targetGroup) ActiveTargets() []target.Target { + tg.mtx.Lock() + defer tg.mtx.Unlock() + + result := make([]target.Target, 0, len(tg.targets)) + for _, t := range tg.targets { + if t.Ready() { + result = append(result, t) + } + } + return result +} + +// AllTargets returns all targets of this group. +func (tg *targetGroup) AllTargets() []target.Target { + result := make([]target.Target, 0, len(tg.targets)) + for _, t := range tg.targets { + result = append(result, t) + } + return result +} diff --git a/clients/pkg/promtail/targets/docker/target_test.go b/clients/pkg/promtail/targets/docker/target_test.go new file mode 100644 index 0000000000..91e45832ac --- /dev/null +++ b/clients/pkg/promtail/targets/docker/target_test.go @@ -0,0 +1,78 @@ +package docker + +import ( + "net/http" + "net/http/httptest" + "os" + "sort" + "testing" + "time" + + "github.com/docker/docker/client" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/positions" +) + +func Test_DockerTarget(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + dat, err := os.ReadFile("testdata/flog.log") + require.NoError(t, err) + _, err = w.Write(dat) + require.NoError(t, err) + } + + ts := httptest.NewServer(http.HandlerFunc(h)) + defer ts.Close() + + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + entryHandler := fake.New(func() {}) + client, err := client.NewClientWithOpts(client.WithHost(ts.URL)) + require.NoError(t, err) + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + _, err = NewTarget( + NewMetrics(prometheus.NewRegistry()), + logger, + entryHandler, + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + ) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(entryHandler.Received()) >= 5 + }, 5*time.Second, 100*time.Millisecond) + + received := entryHandler.Received() + sort.Slice(received, func(i, j int) bool { + return received[i].Timestamp.Before(received[j].Timestamp) + }) + + expectedLines := []string{ + "5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", + "101.54.183.185 - - [09/Dec/2021:09:15:03 +0000] \"POST /next-generation HTTP/1.0\" 416 11468", + "69.27.137.160 - runolfsdottir2670 [09/Dec/2021:09:15:03 +0000] \"HEAD /content/visionary/engineer/cultivate HTTP/1.1\" 302 2975", + "28.104.242.74 - - [09/Dec/2021:09:15:03 +0000] \"PATCH /value-added/cultivate/systems HTTP/2.0\" 405 11843", + "150.187.51.54 - satterfield1852 [09/Dec/2021:09:15:03 +0000] \"GET /incentivize/deliver/innovative/cross-platform HTTP/1.1\" 301 13032", + } + actualLines := make([]string, 0, 5) + for _, entry := range received[:5] { + actualLines = append(actualLines, entry.Line) + } + require.ElementsMatch(t, actualLines, expectedLines) +} diff --git a/clients/pkg/promtail/targets/docker/targetmanager.go b/clients/pkg/promtail/targets/docker/targetmanager.go new file mode 100644 index 0000000000..3fb4c32a03 --- /dev/null +++ b/clients/pkg/promtail/targets/docker/targetmanager.go @@ -0,0 +1,141 @@ +package docker + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/clients/pkg/logentry/stages" + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/positions" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery" + + "github.com/grafana/loki/pkg/util" +) + +const ( + // See github.com/prometheus/prometheus/discovery/moby + dockerLabel = model.MetaLabelPrefix + "docker_" + dockerLabelContainerPrefix = dockerLabel + "container_" + dockerLabelContainerID = dockerLabelContainerPrefix + "id" + dockerLabelLogStream = dockerLabelContainerPrefix + "_log_stream" +) + +type TargetManager struct { + metrics *Metrics + logger log.Logger + positions positions.Positions + cancel context.CancelFunc + manager *discovery.Manager + pushClient api.EntryHandler + groups map[string]*targetGroup +} + +func NewTargetManager( + metrics *Metrics, + logger log.Logger, + positions positions.Positions, + pushClient api.EntryHandler, + scrapeConfigs []scrapeconfig.Config, +) (*TargetManager, error) { + ctx, cancel := context.WithCancel(context.Background()) + tm := &TargetManager{ + metrics: metrics, + logger: logger, + cancel: cancel, + positions: positions, + manager: discovery.NewManager(ctx, log.With(logger, "component", "docker_discovery")), + pushClient: pushClient, + groups: make(map[string]*targetGroup), + } + configs := map[string]discovery.Configs{} + for _, cfg := range scrapeConfigs { + if cfg.DockerSDConfigs != nil { + pipeline, err := stages.NewPipeline( + log.With(logger, "component", "docker_pipeline"), + cfg.PipelineStages, + &cfg.JobName, + metrics.reg, + ) + if err != nil { + return nil, err + } + + for _, sdConfig := range cfg.DockerSDConfigs { + syncerKey := fmt.Sprintf("%s/%s:%d", cfg.JobName, sdConfig.Host, sdConfig.Port) + _, ok := tm.groups[syncerKey] + if !ok { + tm.groups[syncerKey] = &targetGroup{ + metrics: metrics, + logger: logger, + positions: positions, + targets: make(map[string]*Target), + entryHandler: pipeline.Wrap(pushClient), + defaultLabels: model.LabelSet{}, + relabelConfig: cfg.RelabelConfigs, + host: sdConfig.Host, + } + } + configs[syncerKey] = append(configs[syncerKey], sdConfig) + } + } else { + level.Debug(tm.logger).Log("msg", "Docker service discovery configs are empty") + } + } + + go tm.run() + go util.LogError("running target manager", tm.manager.Run) + + return tm, tm.manager.ApplyConfig(configs) +} + +// run listens on the service discovery and adds new targets. +func (tm *TargetManager) run() { + for targetGroups := range tm.manager.SyncCh() { + for jobName, groups := range targetGroups { + tg, ok := tm.groups[jobName] + if !ok { + level.Debug(tm.logger).Log("msg", "unknown target for job", "job", jobName) + continue + } + tg.sync(groups) + } + } +} + +// Ready returns true if at least one Docker target is active. +func (tm *TargetManager) Ready() bool { + for _, s := range tm.groups { + if s.Ready() { + return true + } + } + return false +} + +func (tm *TargetManager) Stop() { + tm.cancel() + for _, s := range tm.groups { + s.Stop() + } +} + +func (tm *TargetManager) ActiveTargets() map[string][]target.Target { + result := make(map[string][]target.Target, len(tm.groups)) + for k, s := range tm.groups { + result[k] = s.ActiveTargets() + } + return result +} + +func (tm *TargetManager) AllTargets() map[string][]target.Target { + result := make(map[string][]target.Target, len(tm.groups)) + for k, s := range tm.groups { + result[k] = s.AllTargets() + } + return result +} diff --git a/clients/pkg/promtail/targets/docker/targetmanager_test.go b/clients/pkg/promtail/targets/docker/targetmanager_test.go new file mode 100644 index 0000000000..f60bf5e3ea --- /dev/null +++ b/clients/pkg/promtail/targets/docker/targetmanager_test.go @@ -0,0 +1,112 @@ +package docker + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "sort" + "strings" + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/network" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/moby" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/positions" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" +) + +func Test_TargetManager(t *testing.T) { + h := func(w http.ResponseWriter, r *http.Request) { + switch path := r.URL.Path; { + case path == "/_ping": + _, err := w.Write([]byte("OK")) + require.NoError(t, err) + case strings.HasSuffix(path, "/containers/json"): + // Serve container list + w.Header().Set("Content-Type", "application/json") + containerResponse := []types.Container{{ + ID: "1234", + Names: []string{"flog"}, + NetworkSettings: &types.SummaryNetworkSettings{ + Networks: map[string]*network.EndpointSettings{ + "foo": { + NetworkID: "my_network", + IPAddress: "127.0.0.1", + }, + }, + }, + }} + err := json.NewEncoder(w).Encode(containerResponse) + require.NoError(t, err) + case strings.HasSuffix(path, "/networks"): + // Serve networks + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode([]types.NetworkResource{}) + require.NoError(t, err) + default: + // Serve container logs + dat, err := os.ReadFile("testdata/flog.log") + require.NoError(t, err) + _, err = w.Write(dat) + require.NoError(t, err) + } + } + dockerDaemonMock := httptest.NewServer(http.HandlerFunc(h)) + defer dockerDaemonMock.Close() + + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + entryHandler := fake.New(func() {}) + cfgs := []scrapeconfig.Config{{ + DockerSDConfigs: []*moby.DockerSDConfig{{ + Host: dockerDaemonMock.URL, + RefreshInterval: model.Duration(100 * time.Millisecond), + }}, + }} + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + ta, err := NewTargetManager( + NewMetrics(prometheus.NewRegistry()), + logger, + ps, + entryHandler, + cfgs, + ) + require.NoError(t, err) + require.True(t, ta.Ready()) + + require.Eventually(t, func() bool { + return len(entryHandler.Received()) >= 5 + }, 20*time.Second, 100*time.Millisecond) + + received := entryHandler.Received() + sort.Slice(received, func(i, j int) bool { + return received[i].Timestamp.Before(received[j].Timestamp) + }) + + expectedLines := []string{ + "5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", + "101.54.183.185 - - [09/Dec/2021:09:15:03 +0000] \"POST /next-generation HTTP/1.0\" 416 11468", + "69.27.137.160 - runolfsdottir2670 [09/Dec/2021:09:15:03 +0000] \"HEAD /content/visionary/engineer/cultivate HTTP/1.1\" 302 2975", + "28.104.242.74 - - [09/Dec/2021:09:15:03 +0000] \"PATCH /value-added/cultivate/systems HTTP/2.0\" 405 11843", + "150.187.51.54 - satterfield1852 [09/Dec/2021:09:15:03 +0000] \"GET /incentivize/deliver/innovative/cross-platform HTTP/1.1\" 301 13032", + } + actualLines := make([]string, 0, 5) + for _, entry := range received[:5] { + actualLines = append(actualLines, entry.Line) + } + require.ElementsMatch(t, actualLines, expectedLines) +} diff --git a/clients/pkg/promtail/targets/docker/testdata/flog.log b/clients/pkg/promtail/targets/docker/testdata/flog.log new file mode 100644 index 0000000000..a3370e02d8 Binary files /dev/null and b/clients/pkg/promtail/targets/docker/testdata/flog.log differ diff --git a/clients/pkg/promtail/targets/manager.go b/clients/pkg/promtail/targets/manager.go index 97842eee5c..19c703d1b0 100644 --- a/clients/pkg/promtail/targets/manager.go +++ b/clients/pkg/promtail/targets/manager.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/positions" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/cloudflare" + "github.com/grafana/loki/clients/pkg/promtail/targets/docker" "github.com/grafana/loki/clients/pkg/promtail/targets/file" "github.com/grafana/loki/clients/pkg/promtail/targets/gcplog" "github.com/grafana/loki/clients/pkg/promtail/targets/gelf" @@ -34,6 +35,8 @@ const ( KafkaConfigs = "kafkaConfigs" GelfConfigs = "gelfConfigs" CloudflareConfigs = "cloudflareConfigs" + DockerConfigs = "dockerConfigs" + DockerSDConfigs = "dockerSDConfigs" ) type targetManager interface { @@ -91,6 +94,8 @@ func NewTargetManagers( targetScrapeConfigs[GelfConfigs] = append(targetScrapeConfigs[GelfConfigs], cfg) case cfg.CloudflareConfig != nil: targetScrapeConfigs[CloudflareConfigs] = append(targetScrapeConfigs[CloudflareConfigs], cfg) + case cfg.DockerSDConfigs != nil: + targetScrapeConfigs[DockerSDConfigs] = append(targetScrapeConfigs[DockerSDConfigs], cfg) default: return nil, fmt.Errorf("no valid target scrape config defined for %q", cfg.JobName) } @@ -116,6 +121,7 @@ func NewTargetManagers( gcplogMetrics *gcplog.Metrics gelfMetrics *gelf.Metrics cloudflareMetrics *cloudflare.Metrics + dockerMetrics *docker.Metrics ) if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 { fileMetrics = file.NewMetrics(reg) @@ -132,6 +138,9 @@ func NewTargetManagers( if len(targetScrapeConfigs[CloudflareConfigs]) > 0 { cloudflareMetrics = cloudflare.NewMetrics(reg) } + if len(targetScrapeConfigs[DockerConfigs]) > 0 || len(targetScrapeConfigs[DockerSDConfigs]) > 0 { + dockerMetrics = docker.NewMetrics(reg) + } for target, scrapeConfigs := range targetScrapeConfigs { switch target { @@ -229,6 +238,26 @@ func NewTargetManagers( return nil, errors.Wrap(err, "failed to make cloudflare target manager") } targetManagers = append(targetManagers, cfTargetManager) + case DockerConfigs: + pos, err := getPositionFile() + if err != nil { + return nil, err + } + cfTargetManager, err := docker.NewTargetManager(dockerMetrics, logger, pos, client, scrapeConfigs) + if err != nil { + return nil, errors.Wrap(err, "failed to make Docker target manager") + } + targetManagers = append(targetManagers, cfTargetManager) + case DockerSDConfigs: + pos, err := getPositionFile() + if err != nil { + return nil, err + } + cfTargetManager, err := docker.NewTargetManager(dockerMetrics, logger, pos, client, scrapeConfigs) + if err != nil { + return nil, errors.Wrap(err, "failed to make Docker service discovery target manager") + } + targetManagers = append(targetManagers, cfTargetManager) default: return nil, errors.New("unknown scrape config") } diff --git a/clients/pkg/promtail/targets/target/target.go b/clients/pkg/promtail/targets/target/target.go index 8acf6aad61..ecb021c69f 100644 --- a/clients/pkg/promtail/targets/target/target.go +++ b/clients/pkg/promtail/targets/target/target.go @@ -38,6 +38,9 @@ const ( // CloudflareTargetType is a Cloudflare target CloudflareTargetType = TargetType("Cloudflare") + + // DockerTargetType is a Docker target + DockerTargetType = TargetType("Docker") ) // Target is a promtail scrape target diff --git a/docs/sources/clients/docker-driver/_index.md b/docs/sources/clients/docker-driver/_index.md index 39f4918893..c979b3a682 100644 --- a/docs/sources/clients/docker-driver/_index.md +++ b/docs/sources/clients/docker-driver/_index.md @@ -63,33 +63,4 @@ docker plugin rm loki The driver keeps all logs in memory and will drop log entries if Loki is not reachable and if the quantity of `max_retries` has been exceeded. To avoid the dropping of log entries, setting `max_retries` to zero allows unlimited retries; the drive will continue trying forever until Loki is again reachable. Trying forever may have undesired consequences, because the Docker daemon will wait for the Loki driver to process all logs of a container, until the container is removed. Thus, the Docker daemon might wait forever if the container is stuck. -This issue is avoided by using [Promtail](../promtail) with this configuration: - -```yaml -server: - disable: true - -positions: - filename: loki-positions.yml - -clients: - - url: http://ip_or_hostname_where_Loki_run:3100/loki/api/v1/push - # If using basic auth, configures the username and password sent. - basic_auth: - # The username to use for basic auth - username: - # The password to use for basic auth - password: - -scrape_configs: - - job_name: system - pipeline_stages: - - docker: {} - static_configs: - - labels: - job: docker - __path__: /var/lib/docker/containers/*/*-json.log - -``` - -This will enable Promtail to tail *all* Docker container logs and publish them to Loki. +Use Promtail's [Docker target](../promtail/configuration/#docker) or [Docker service discovery](../promtail/configuration/#docker_sd_config) to avoid this issue. diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index 55ec0a0932..23a438e8ca 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -358,6 +358,11 @@ consul_sd_configs: # running on the same host as Promtail. consulagent_sd_configs: [ - ... ] + +# Describes how to use the Docker daemon API to discover containers running on +# the same host as Promtail. +docker_sd_configs: + [ - ... ] ``` ### pipeline_stages @@ -1622,6 +1627,116 @@ users with thousands of services it can be more efficient to use the Consul API directly which has basic support for filtering nodes (currently by node metadata and a single tag). +### docker_sd_config + +Docker service discovery allows retrieving targets from a Docker daemon. +It will only watch containers of the Docker daemon referenced with the host parameter. Docker +service discovery should run on each node in a distributed setup. The containers must run with +either the [json-file](https://docs.docker.com/config/containers/logging/json-file/) +or [journald](https://docs.docker.com/config/containers/logging/journald/) logging driver. + +Please note that the discovery will not pick up finished containers. That means +Promtail will not scrape the remaining logs from finished containers after a restart. + +The configuration is inherited from [Prometheus' Docker service discovery](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#docker_sd_config). + +```yaml +# Address of the Docker daemon. Use unix:///var/run/docker.sock for a local setup. +host: + +# Optional proxy URL. +[ proxy_url: ] + +# TLS configuration. +tls_config: + [ ] + +# The port to scrape metrics from, when `role` is nodes, and for discovered +# tasks and services that don't have published ports. +[ port: | default = 80 ] + +# The host to use if the container is in host networking mode. +[ host_networking_host: | default = "localhost" ] + +# Optional filters to limit the discovery process to a subset of available +# resources. +# The available filters are listed in the Docker documentation: +# Containers: https://docs.docker.com/engine/api/v1.41/#operation/ContainerList +[ filters: + [ - name: + values: , [...] ] +] + +# The time after which the containers are refreshed. +[ refresh_interval: | default = 60s ] + +# Authentication information used by Promtail to authenticate itself to the +# Docker daemon. +# Note that `basic_auth` and `authorization` options are mutually exclusive. +# `password` and `password_file` are mutually exclusive. + +# Optional HTTP basic authentication information. +basic_auth: + [ username: ] + [ password: ] + [ password_file: ] + +# Optional `Authorization` header configuration. +authorization: + # Sets the authentication type. + [ type: | default: Bearer ] + # Sets the credentials. It is mutually exclusive with + # `credentials_file`. + [ credentials: ] + # Sets the credentials to the credentials read from the configured file. + # It is mutually exclusive with `credentials`. + [ credentials_file: ] + +# Optional OAuth 2.0 configuration. +# Cannot be used at the same time as basic_auth or authorization. +oauth2: + [ ] + +# Configure whether HTTP requests follow HTTP 3xx redirects. +[ follow_redirects: | default = true ] +``` + +Available meta labels: + + * `__meta_docker_container_id`: the ID of the container + * `__meta_docker_container_name`: the name of the container + * `__meta_docker_container_network_mode`: the network mode of the container + * `__meta_docker_container_label_`: each label of the container + * `__meta_docker_container_log_stream`: the log stream type `stdout` or `stderr` + * `__meta_docker_network_id`: the ID of the network + * `__meta_docker_network_name`: the name of the network + * `__meta_docker_network_ingress`: whether the network is ingress + * `__meta_docker_network_internal`: whether the network is internal + * `__meta_docker_network_label_`: each label of the network + * `__meta_docker_network_scope`: the scope of the network + * `__meta_docker_network_ip`: the IP of the container in this network + * `__meta_docker_port_private`: the port on the container + * `__meta_docker_port_public`: the external port if a port-mapping exists + * `__meta_docker_port_public_ip`: the public IP if a port-mapping exists + +These labels can be used during relabeling. For instance, the following configuration scrapes the container named `flog` and removes the leading slash (`/`) from the container name. + +```yaml +scrape_configs: + - job_name: flog_scrape + docker_sd_configs: + - host: unix:///var/run/docker.sock + refresh_interval: 5s + filters: + - name: name + values: [flog] + relabel_configs: + - source_labels: ['__meta_docker_container_name'] + regex: '/(.*)' + target_label: 'container' +``` + + ## target_config The `target_config` block controls the behavior of reading files from discovered diff --git a/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go b/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go new file mode 100644 index 0000000000..8f6e0a737a --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go @@ -0,0 +1,190 @@ +package stdcopy // import "github.com/docker/docker/pkg/stdcopy" + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" +) + +// StdType is the type of standard stream +// a writer can multiplex to. +type StdType byte + +const ( + // Stdin represents standard input stream type. + Stdin StdType = iota + // Stdout represents standard output stream type. + Stdout + // Stderr represents standard error steam type. + Stderr + // Systemerr represents errors originating from the system that make it + // into the multiplexed stream. + Systemerr + + stdWriterPrefixLen = 8 + stdWriterFdIndex = 0 + stdWriterSizeIndex = 4 + + startingBufLen = 32*1024 + stdWriterPrefixLen + 1 +) + +var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }} + +// stdWriter is wrapper of io.Writer with extra customized info. +type stdWriter struct { + io.Writer + prefix byte +} + +// Write sends the buffer to the underneath writer. +// It inserts the prefix header before the buffer, +// so stdcopy.StdCopy knows where to multiplex the output. +// It makes stdWriter to implement io.Writer. +func (w *stdWriter) Write(p []byte) (n int, err error) { + if w == nil || w.Writer == nil { + return 0, errors.New("Writer not instantiated") + } + if p == nil { + return 0, nil + } + + header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} + binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) + buf := bufPool.Get().(*bytes.Buffer) + buf.Write(header[:]) + buf.Write(p) + + n, err = w.Writer.Write(buf.Bytes()) + n -= stdWriterPrefixLen + if n < 0 { + n = 0 + } + + buf.Reset() + bufPool.Put(buf) + return +} + +// NewStdWriter instantiates a new Writer. +// Everything written to it will be encapsulated using a custom format, +// and written to the underlying `w` stream. +// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. +// `t` indicates the id of the stream to encapsulate. +// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. +func NewStdWriter(w io.Writer, t StdType) io.Writer { + return &stdWriter{ + Writer: w, + prefix: byte(t), + } +} + +// StdCopy is a modified version of io.Copy. +// +// StdCopy will demultiplex `src`, assuming that it contains two streams, +// previously multiplexed together using a StdWriter instance. +// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. +// +// StdCopy will read until it hits EOF on `src`. It will then return a nil error. +// In other words: if `err` is non nil, it indicates a real underlying error. +// +// `written` will hold the total number of bytes written to `dstout` and `dsterr`. +func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { + var ( + buf = make([]byte, startingBufLen) + bufLen = len(buf) + nr, nw int + er, ew error + out io.Writer + frameSize int + ) + + for { + // Make sure we have at least a full header + for nr < stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + stream := StdType(buf[stdWriterFdIndex]) + // Check the first byte to know where to write + switch stream { + case Stdin: + fallthrough + case Stdout: + // Write on stdout + out = dstout + case Stderr: + // Write on stderr + out = dsterr + case Systemerr: + // If we're on Systemerr, we won't write anywhere. + // NB: if this code changes later, make sure you don't try to write + // to outstream if Systemerr is the stream + out = nil + default: + return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) + } + + // Retrieve the size of the frame + frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) + + // Check if the buffer is big enough to read the frame. + // Extend it if necessary. + if frameSize+stdWriterPrefixLen > bufLen { + buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) + bufLen = len(buf) + } + + // While the amount of bytes read is less than the size of the frame + header, we keep reading + for nr < frameSize+stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < frameSize+stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + // we might have an error from the source mixed up in our multiplexed + // stream. if we do, return it. + if stream == Systemerr { + return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) + } + + // Write the retrieved frame (without header) + nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) + if ew != nil { + return 0, ew + } + + // If the frame has not been fully written: error + if nw != frameSize { + return 0, io.ErrShortWrite + } + written += int64(nw) + + // Move the rest of the buffer to the beginning + copy(buf, buf[frameSize+stdWriterPrefixLen:]) + // Move the index + nr -= frameSize + stdWriterPrefixLen + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ca56e5adc7..4c867f5642 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -341,6 +341,7 @@ github.com/docker/docker/pkg/plugins/transport github.com/docker/docker/pkg/pools github.com/docker/docker/pkg/progress github.com/docker/docker/pkg/pubsub +github.com/docker/docker/pkg/stdcopy github.com/docker/docker/pkg/streamformatter github.com/docker/docker/pkg/stringid github.com/docker/docker/pkg/tailfile