mirror of https://github.com/grafana/loki
Provide Docker target and discovery in Promtail. (#4911)
**What this PR does / why we need it**: This patch adds support to fetch Docker container logs through the Docker daemon API. This should be more robust than the Loki Docker driver or scraping the logs files. The new Docker target will also collect meta information of the scraped containers. **Which issue(s) this PR fixes**: Addresses #2361 Closes #4703 **Special notes for your reviewer**: **Checklist** - [x] Documentation added - [x] Tests updated - [x] Add an entry in the `CHANGELOG.md` about the changes.pull/5230/head^2
parent
2e6e050453
commit
4716eea0db
@ -0,0 +1,38 @@ |
||||
package docker |
||||
|
||||
import "github.com/prometheus/client_golang/prometheus" |
||||
|
||||
// Metrics holds a set of Docker target metrics.
|
||||
type Metrics struct { |
||||
reg prometheus.Registerer |
||||
|
||||
dockerEntries prometheus.Counter |
||||
dockerErrors prometheus.Counter |
||||
} |
||||
|
||||
// NewMetrics creates a new set of Docker target metrics. If reg is non-nil, the
|
||||
// metrics will be registered.
|
||||
func NewMetrics(reg prometheus.Registerer) *Metrics { |
||||
var m Metrics |
||||
m.reg = reg |
||||
|
||||
m.dockerEntries = prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "docker_target_entries_total", |
||||
Help: "Total number of successful entries sent to the Docker target", |
||||
}) |
||||
m.dockerErrors = prometheus.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "docker_target_parsing_errors_total", |
||||
Help: "Total number of parsing errors while receiving Docker messages", |
||||
}) |
||||
|
||||
if reg != nil { |
||||
reg.MustRegister( |
||||
m.dockerEntries, |
||||
m.dockerErrors, |
||||
) |
||||
} |
||||
|
||||
return &m |
||||
} |
||||
@ -0,0 +1,231 @@ |
||||
package docker |
||||
|
||||
import ( |
||||
"bufio" |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
docker_types "github.com/docker/docker/api/types" |
||||
"github.com/docker/docker/client" |
||||
"github.com/docker/docker/pkg/stdcopy" |
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/prometheus/prometheus/model/relabel" |
||||
"go.uber.org/atomic" |
||||
|
||||
"github.com/grafana/loki/clients/pkg/promtail/api" |
||||
"github.com/grafana/loki/clients/pkg/promtail/positions" |
||||
"github.com/grafana/loki/clients/pkg/promtail/targets/target" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
) |
||||
|
||||
type Target struct { |
||||
logger log.Logger |
||||
handler api.EntryHandler |
||||
since int64 |
||||
positions positions.Positions |
||||
containerName string |
||||
labels model.LabelSet |
||||
relabelConfig []*relabel.Config |
||||
metrics *Metrics |
||||
|
||||
cancel context.CancelFunc |
||||
client client.APIClient |
||||
wg sync.WaitGroup |
||||
running *atomic.Bool |
||||
err error |
||||
} |
||||
|
||||
func NewTarget( |
||||
metrics *Metrics, |
||||
logger log.Logger, |
||||
handler api.EntryHandler, |
||||
position positions.Positions, |
||||
containerName string, |
||||
labels model.LabelSet, |
||||
relabelConfig []*relabel.Config, |
||||
client client.APIClient, |
||||
) (*Target, error) { |
||||
|
||||
pos, err := position.Get(positions.CursorKey(containerName)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
var since int64 |
||||
if pos != 0 { |
||||
since = pos |
||||
} |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
t := &Target{ |
||||
logger: logger, |
||||
handler: handler, |
||||
since: since, |
||||
positions: position, |
||||
containerName: containerName, |
||||
labels: labels, |
||||
relabelConfig: relabelConfig, |
||||
metrics: metrics, |
||||
|
||||
cancel: cancel, |
||||
client: client, |
||||
running: atomic.NewBool(false), |
||||
} |
||||
go t.processLoop(ctx) |
||||
return t, nil |
||||
} |
||||
|
||||
func (t *Target) processLoop(ctx context.Context) { |
||||
t.wg.Add(1) |
||||
defer t.wg.Done() |
||||
t.running.Store(true) |
||||
|
||||
opts := docker_types.ContainerLogsOptions{ |
||||
ShowStdout: true, |
||||
ShowStderr: true, |
||||
Follow: true, |
||||
Timestamps: true, |
||||
Since: strconv.FormatInt(t.since, 10), |
||||
} |
||||
|
||||
logs, err := t.client.ContainerLogs(ctx, t.containerName, opts) |
||||
if err != nil { |
||||
level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err) |
||||
t.err = err |
||||
return |
||||
} |
||||
|
||||
// Start transferring
|
||||
rstdout, wstdout := io.Pipe() |
||||
rstderr, wstderr := io.Pipe() |
||||
t.wg.Add(1) |
||||
go func() { |
||||
defer func() { |
||||
t.wg.Done() |
||||
wstdout.Close() |
||||
wstderr.Close() |
||||
t.Stop() |
||||
}() |
||||
|
||||
written, err := stdcopy.StdCopy(wstdout, wstderr, logs) |
||||
if err != nil { |
||||
level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err) |
||||
} else { |
||||
level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName) |
||||
} |
||||
}() |
||||
|
||||
// Start processing
|
||||
t.wg.Add(2) |
||||
go t.process(rstdout, "stdout") |
||||
go t.process(rstderr, "stderr") |
||||
|
||||
// Wait until done
|
||||
<-ctx.Done() |
||||
t.running.Store(false) |
||||
logs.Close() |
||||
level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName) |
||||
} |
||||
|
||||
// extractTs tries for read the timestamp from the beginning of the log line.
|
||||
// It's expected to follow the format 2006-01-02T15:04:05.999999999Z07:00.
|
||||
func extractTs(line string) (time.Time, string, error) { |
||||
pair := strings.SplitN(line, " ", 2) |
||||
if len(pair) != 2 { |
||||
return time.Now(), line, fmt.Errorf("Could not find timestamp in '%s'", line) |
||||
} |
||||
ts, err := time.Parse("2006-01-02T15:04:05.999999999Z07:00", pair[0]) |
||||
if err != nil { |
||||
return time.Now(), line, fmt.Errorf("Could not parse timestamp from '%s': %w", pair[0], err) |
||||
} |
||||
return ts, pair[1], nil |
||||
} |
||||
|
||||
func (t *Target) process(r io.Reader, logStream string) { |
||||
defer func() { |
||||
t.wg.Done() |
||||
}() |
||||
|
||||
scanner := bufio.NewScanner(r) |
||||
for scanner.Scan() { |
||||
line := scanner.Text() |
||||
ts, line, err := extractTs(line) |
||||
if err != nil { |
||||
level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) |
||||
t.metrics.dockerErrors.Inc() |
||||
continue |
||||
} |
||||
|
||||
// Add all labels from the config, relabel and filter them.
|
||||
lb := labels.NewBuilder(nil) |
||||
for k, v := range t.labels { |
||||
lb.Set(string(k), string(v)) |
||||
} |
||||
lb.Set(dockerLabelLogStream, logStream) |
||||
processed := relabel.Process(lb.Labels(), t.relabelConfig...) |
||||
|
||||
filtered := make(model.LabelSet) |
||||
for _, lbl := range processed { |
||||
if strings.HasPrefix(lbl.Name, "__") { |
||||
continue |
||||
} |
||||
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) |
||||
} |
||||
|
||||
t.handler.Chan() <- api.Entry{ |
||||
Labels: filtered, |
||||
Entry: logproto.Entry{ |
||||
Timestamp: ts, |
||||
Line: line, |
||||
}, |
||||
} |
||||
t.metrics.dockerEntries.Inc() |
||||
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix()) |
||||
} |
||||
|
||||
err := scanner.Err() |
||||
if err != nil { |
||||
level.Warn(t.logger).Log("msg", "finished scanning logs lines with an error", "err", err) |
||||
} |
||||
|
||||
} |
||||
|
||||
func (t *Target) Stop() { |
||||
t.cancel() |
||||
t.wg.Wait() |
||||
level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName) |
||||
} |
||||
|
||||
func (t *Target) Type() target.TargetType { |
||||
return target.DockerTargetType |
||||
} |
||||
|
||||
func (t *Target) Ready() bool { |
||||
return t.running.Load() |
||||
} |
||||
|
||||
func (t *Target) DiscoveredLabels() model.LabelSet { |
||||
return t.labels |
||||
} |
||||
|
||||
func (t *Target) Labels() model.LabelSet { |
||||
return t.labels |
||||
} |
||||
|
||||
// Details returns target-specific details.
|
||||
func (t *Target) Details() interface{} { |
||||
return map[string]string{ |
||||
"id": t.containerName, |
||||
"error": t.err.Error(), |
||||
"position": t.positions.GetString(positions.CursorKey(t.containerName)), |
||||
"running": strconv.FormatBool(t.running.Load()), |
||||
} |
||||
} |
||||
@ -0,0 +1,144 @@ |
||||
package docker |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
|
||||
"github.com/docker/docker/client" |
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/loki/clients/pkg/promtail/api" |
||||
"github.com/grafana/loki/clients/pkg/promtail/positions" |
||||
"github.com/grafana/loki/clients/pkg/promtail/targets/target" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
"github.com/prometheus/prometheus/model/relabel" |
||||
) |
||||
|
||||
const DockerSource = "Docker" |
||||
|
||||
// targetGroup manages all container targets of one Docker daemon.
|
||||
type targetGroup struct { |
||||
metrics *Metrics |
||||
logger log.Logger |
||||
positions positions.Positions |
||||
entryHandler api.EntryHandler |
||||
defaultLabels model.LabelSet |
||||
relabelConfig []*relabel.Config |
||||
host string |
||||
client client.APIClient |
||||
|
||||
mtx sync.Mutex |
||||
targets map[string]*Target |
||||
} |
||||
|
||||
func (tg *targetGroup) sync(groups []*targetgroup.Group) { |
||||
tg.mtx.Lock() |
||||
defer tg.mtx.Unlock() |
||||
|
||||
for _, group := range groups { |
||||
if group.Source != DockerSource { |
||||
continue |
||||
} |
||||
|
||||
for _, t := range group.Targets { |
||||
containerID, ok := t[dockerLabelContainerID] |
||||
if !ok { |
||||
level.Debug(tg.logger).Log("msg", "Docker target did not include container ID") |
||||
continue |
||||
} |
||||
|
||||
err := tg.addTarget(string(containerID), t) |
||||
if err != nil { |
||||
level.Error(tg.logger).Log("msg", "could not add target", "containerID", containerID, "err", err) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// addTarget checks whether the container with given id is already known. If not it's added to the this group
|
||||
func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) error { |
||||
if tg.client == nil { |
||||
var err error |
||||
opts := []client.Opt{ |
||||
client.WithHost(tg.host), |
||||
client.WithAPIVersionNegotiation(), |
||||
} |
||||
tg.client, err = client.NewClientWithOpts(opts...) |
||||
if err != nil { |
||||
level.Error(tg.logger).Log("msg", "could not create new Docker client", "err", err) |
||||
return err |
||||
} |
||||
} |
||||
|
||||
_, ok := tg.targets[id] |
||||
if ok { |
||||
level.Debug(tg.logger).Log("msg", "ignoring container that is already being scraped", "container", id) |
||||
return nil |
||||
} |
||||
|
||||
t, err := NewTarget( |
||||
tg.metrics, |
||||
log.With(tg.logger, "target", fmt.Sprintf("docker/%s", id)), |
||||
tg.entryHandler, |
||||
tg.positions, |
||||
id, |
||||
discoveredLabels.Merge(tg.defaultLabels), |
||||
tg.relabelConfig, |
||||
tg.client, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
tg.targets[id] = t |
||||
level.Error(tg.logger).Log("msg", "added Docker target", "containerID", id) |
||||
return nil |
||||
} |
||||
|
||||
// Ready returns true if at least one target is running.
|
||||
func (tg *targetGroup) Ready() bool { |
||||
tg.mtx.Lock() |
||||
defer tg.mtx.Unlock() |
||||
|
||||
for _, t := range tg.targets { |
||||
if t.Ready() { |
||||
return true |
||||
} |
||||
} |
||||
|
||||
return true |
||||
} |
||||
|
||||
// Stop all targets
|
||||
func (tg *targetGroup) Stop() { |
||||
tg.mtx.Lock() |
||||
defer tg.mtx.Unlock() |
||||
|
||||
for _, t := range tg.targets { |
||||
t.Stop() |
||||
} |
||||
tg.entryHandler.Stop() |
||||
} |
||||
|
||||
// ActiveTargets return all targets that are ready.
|
||||
func (tg *targetGroup) ActiveTargets() []target.Target { |
||||
tg.mtx.Lock() |
||||
defer tg.mtx.Unlock() |
||||
|
||||
result := make([]target.Target, 0, len(tg.targets)) |
||||
for _, t := range tg.targets { |
||||
if t.Ready() { |
||||
result = append(result, t) |
||||
} |
||||
} |
||||
return result |
||||
} |
||||
|
||||
// AllTargets returns all targets of this group.
|
||||
func (tg *targetGroup) AllTargets() []target.Target { |
||||
result := make([]target.Target, 0, len(tg.targets)) |
||||
for _, t := range tg.targets { |
||||
result = append(result, t) |
||||
} |
||||
return result |
||||
} |
||||
@ -0,0 +1,78 @@ |
||||
package docker |
||||
|
||||
import ( |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"os" |
||||
"sort" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/docker/docker/client" |
||||
"github.com/go-kit/log" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/relabel" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/clients/pkg/promtail/client/fake" |
||||
"github.com/grafana/loki/clients/pkg/promtail/positions" |
||||
) |
||||
|
||||
func Test_DockerTarget(t *testing.T) { |
||||
h := func(w http.ResponseWriter, r *http.Request) { |
||||
dat, err := os.ReadFile("testdata/flog.log") |
||||
require.NoError(t, err) |
||||
_, err = w.Write(dat) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(h)) |
||||
defer ts.Close() |
||||
|
||||
w := log.NewSyncWriter(os.Stderr) |
||||
logger := log.NewLogfmtLogger(w) |
||||
entryHandler := fake.New(func() {}) |
||||
client, err := client.NewClientWithOpts(client.WithHost(ts.URL)) |
||||
require.NoError(t, err) |
||||
|
||||
ps, err := positions.New(logger, positions.Config{ |
||||
SyncPeriod: 10 * time.Second, |
||||
PositionsFile: t.TempDir() + "/positions.yml", |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = NewTarget( |
||||
NewMetrics(prometheus.NewRegistry()), |
||||
logger, |
||||
entryHandler, |
||||
ps, |
||||
"flog", |
||||
model.LabelSet{"job": "docker"}, |
||||
[]*relabel.Config{}, |
||||
client, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
require.Eventually(t, func() bool { |
||||
return len(entryHandler.Received()) >= 5 |
||||
}, 5*time.Second, 100*time.Millisecond) |
||||
|
||||
received := entryHandler.Received() |
||||
sort.Slice(received, func(i, j int) bool { |
||||
return received[i].Timestamp.Before(received[j].Timestamp) |
||||
}) |
||||
|
||||
expectedLines := []string{ |
||||
"5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", |
||||
"101.54.183.185 - - [09/Dec/2021:09:15:03 +0000] \"POST /next-generation HTTP/1.0\" 416 11468", |
||||
"69.27.137.160 - runolfsdottir2670 [09/Dec/2021:09:15:03 +0000] \"HEAD /content/visionary/engineer/cultivate HTTP/1.1\" 302 2975", |
||||
"28.104.242.74 - - [09/Dec/2021:09:15:03 +0000] \"PATCH /value-added/cultivate/systems HTTP/2.0\" 405 11843", |
||||
"150.187.51.54 - satterfield1852 [09/Dec/2021:09:15:03 +0000] \"GET /incentivize/deliver/innovative/cross-platform HTTP/1.1\" 301 13032", |
||||
} |
||||
actualLines := make([]string, 0, 5) |
||||
for _, entry := range received[:5] { |
||||
actualLines = append(actualLines, entry.Line) |
||||
} |
||||
require.ElementsMatch(t, actualLines, expectedLines) |
||||
} |
||||
@ -0,0 +1,141 @@ |
||||
package docker |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/loki/clients/pkg/logentry/stages" |
||||
"github.com/grafana/loki/clients/pkg/promtail/api" |
||||
"github.com/grafana/loki/clients/pkg/promtail/positions" |
||||
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" |
||||
"github.com/grafana/loki/clients/pkg/promtail/targets/target" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/discovery" |
||||
|
||||
"github.com/grafana/loki/pkg/util" |
||||
) |
||||
|
||||
const ( |
||||
// See github.com/prometheus/prometheus/discovery/moby
|
||||
dockerLabel = model.MetaLabelPrefix + "docker_" |
||||
dockerLabelContainerPrefix = dockerLabel + "container_" |
||||
dockerLabelContainerID = dockerLabelContainerPrefix + "id" |
||||
dockerLabelLogStream = dockerLabelContainerPrefix + "_log_stream" |
||||
) |
||||
|
||||
type TargetManager struct { |
||||
metrics *Metrics |
||||
logger log.Logger |
||||
positions positions.Positions |
||||
cancel context.CancelFunc |
||||
manager *discovery.Manager |
||||
pushClient api.EntryHandler |
||||
groups map[string]*targetGroup |
||||
} |
||||
|
||||
func NewTargetManager( |
||||
metrics *Metrics, |
||||
logger log.Logger, |
||||
positions positions.Positions, |
||||
pushClient api.EntryHandler, |
||||
scrapeConfigs []scrapeconfig.Config, |
||||
) (*TargetManager, error) { |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
tm := &TargetManager{ |
||||
metrics: metrics, |
||||
logger: logger, |
||||
cancel: cancel, |
||||
positions: positions, |
||||
manager: discovery.NewManager(ctx, log.With(logger, "component", "docker_discovery")), |
||||
pushClient: pushClient, |
||||
groups: make(map[string]*targetGroup), |
||||
} |
||||
configs := map[string]discovery.Configs{} |
||||
for _, cfg := range scrapeConfigs { |
||||
if cfg.DockerSDConfigs != nil { |
||||
pipeline, err := stages.NewPipeline( |
||||
log.With(logger, "component", "docker_pipeline"), |
||||
cfg.PipelineStages, |
||||
&cfg.JobName, |
||||
metrics.reg, |
||||
) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
for _, sdConfig := range cfg.DockerSDConfigs { |
||||
syncerKey := fmt.Sprintf("%s/%s:%d", cfg.JobName, sdConfig.Host, sdConfig.Port) |
||||
_, ok := tm.groups[syncerKey] |
||||
if !ok { |
||||
tm.groups[syncerKey] = &targetGroup{ |
||||
metrics: metrics, |
||||
logger: logger, |
||||
positions: positions, |
||||
targets: make(map[string]*Target), |
||||
entryHandler: pipeline.Wrap(pushClient), |
||||
defaultLabels: model.LabelSet{}, |
||||
relabelConfig: cfg.RelabelConfigs, |
||||
host: sdConfig.Host, |
||||
} |
||||
} |
||||
configs[syncerKey] = append(configs[syncerKey], sdConfig) |
||||
} |
||||
} else { |
||||
level.Debug(tm.logger).Log("msg", "Docker service discovery configs are empty") |
||||
} |
||||
} |
||||
|
||||
go tm.run() |
||||
go util.LogError("running target manager", tm.manager.Run) |
||||
|
||||
return tm, tm.manager.ApplyConfig(configs) |
||||
} |
||||
|
||||
// run listens on the service discovery and adds new targets.
|
||||
func (tm *TargetManager) run() { |
||||
for targetGroups := range tm.manager.SyncCh() { |
||||
for jobName, groups := range targetGroups { |
||||
tg, ok := tm.groups[jobName] |
||||
if !ok { |
||||
level.Debug(tm.logger).Log("msg", "unknown target for job", "job", jobName) |
||||
continue |
||||
} |
||||
tg.sync(groups) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Ready returns true if at least one Docker target is active.
|
||||
func (tm *TargetManager) Ready() bool { |
||||
for _, s := range tm.groups { |
||||
if s.Ready() { |
||||
return true |
||||
} |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func (tm *TargetManager) Stop() { |
||||
tm.cancel() |
||||
for _, s := range tm.groups { |
||||
s.Stop() |
||||
} |
||||
} |
||||
|
||||
func (tm *TargetManager) ActiveTargets() map[string][]target.Target { |
||||
result := make(map[string][]target.Target, len(tm.groups)) |
||||
for k, s := range tm.groups { |
||||
result[k] = s.ActiveTargets() |
||||
} |
||||
return result |
||||
} |
||||
|
||||
func (tm *TargetManager) AllTargets() map[string][]target.Target { |
||||
result := make(map[string][]target.Target, len(tm.groups)) |
||||
for k, s := range tm.groups { |
||||
result[k] = s.AllTargets() |
||||
} |
||||
return result |
||||
} |
||||
@ -0,0 +1,112 @@ |
||||
package docker |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"os" |
||||
"sort" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/docker/docker/api/types" |
||||
"github.com/docker/docker/api/types/network" |
||||
"github.com/go-kit/log" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/discovery/moby" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/clients/pkg/promtail/client/fake" |
||||
"github.com/grafana/loki/clients/pkg/promtail/positions" |
||||
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" |
||||
) |
||||
|
||||
func Test_TargetManager(t *testing.T) { |
||||
h := func(w http.ResponseWriter, r *http.Request) { |
||||
switch path := r.URL.Path; { |
||||
case path == "/_ping": |
||||
_, err := w.Write([]byte("OK")) |
||||
require.NoError(t, err) |
||||
case strings.HasSuffix(path, "/containers/json"): |
||||
// Serve container list
|
||||
w.Header().Set("Content-Type", "application/json") |
||||
containerResponse := []types.Container{{ |
||||
ID: "1234", |
||||
Names: []string{"flog"}, |
||||
NetworkSettings: &types.SummaryNetworkSettings{ |
||||
Networks: map[string]*network.EndpointSettings{ |
||||
"foo": { |
||||
NetworkID: "my_network", |
||||
IPAddress: "127.0.0.1", |
||||
}, |
||||
}, |
||||
}, |
||||
}} |
||||
err := json.NewEncoder(w).Encode(containerResponse) |
||||
require.NoError(t, err) |
||||
case strings.HasSuffix(path, "/networks"): |
||||
// Serve networks
|
||||
w.Header().Set("Content-Type", "application/json") |
||||
err := json.NewEncoder(w).Encode([]types.NetworkResource{}) |
||||
require.NoError(t, err) |
||||
default: |
||||
// Serve container logs
|
||||
dat, err := os.ReadFile("testdata/flog.log") |
||||
require.NoError(t, err) |
||||
_, err = w.Write(dat) |
||||
require.NoError(t, err) |
||||
} |
||||
} |
||||
dockerDaemonMock := httptest.NewServer(http.HandlerFunc(h)) |
||||
defer dockerDaemonMock.Close() |
||||
|
||||
w := log.NewSyncWriter(os.Stderr) |
||||
logger := log.NewLogfmtLogger(w) |
||||
entryHandler := fake.New(func() {}) |
||||
cfgs := []scrapeconfig.Config{{ |
||||
DockerSDConfigs: []*moby.DockerSDConfig{{ |
||||
Host: dockerDaemonMock.URL, |
||||
RefreshInterval: model.Duration(100 * time.Millisecond), |
||||
}}, |
||||
}} |
||||
|
||||
ps, err := positions.New(logger, positions.Config{ |
||||
SyncPeriod: 10 * time.Second, |
||||
PositionsFile: t.TempDir() + "/positions.yml", |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
ta, err := NewTargetManager( |
||||
NewMetrics(prometheus.NewRegistry()), |
||||
logger, |
||||
ps, |
||||
entryHandler, |
||||
cfgs, |
||||
) |
||||
require.NoError(t, err) |
||||
require.True(t, ta.Ready()) |
||||
|
||||
require.Eventually(t, func() bool { |
||||
return len(entryHandler.Received()) >= 5 |
||||
}, 20*time.Second, 100*time.Millisecond) |
||||
|
||||
received := entryHandler.Received() |
||||
sort.Slice(received, func(i, j int) bool { |
||||
return received[i].Timestamp.Before(received[j].Timestamp) |
||||
}) |
||||
|
||||
expectedLines := []string{ |
||||
"5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", |
||||
"101.54.183.185 - - [09/Dec/2021:09:15:03 +0000] \"POST /next-generation HTTP/1.0\" 416 11468", |
||||
"69.27.137.160 - runolfsdottir2670 [09/Dec/2021:09:15:03 +0000] \"HEAD /content/visionary/engineer/cultivate HTTP/1.1\" 302 2975", |
||||
"28.104.242.74 - - [09/Dec/2021:09:15:03 +0000] \"PATCH /value-added/cultivate/systems HTTP/2.0\" 405 11843", |
||||
"150.187.51.54 - satterfield1852 [09/Dec/2021:09:15:03 +0000] \"GET /incentivize/deliver/innovative/cross-platform HTTP/1.1\" 301 13032", |
||||
} |
||||
actualLines := make([]string, 0, 5) |
||||
for _, entry := range received[:5] { |
||||
actualLines = append(actualLines, entry.Line) |
||||
} |
||||
require.ElementsMatch(t, actualLines, expectedLines) |
||||
} |
||||
Binary file not shown.
@ -0,0 +1,190 @@ |
||||
package stdcopy // import "github.com/docker/docker/pkg/stdcopy"
|
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/binary" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"sync" |
||||
) |
||||
|
||||
// StdType is the type of standard stream
|
||||
// a writer can multiplex to.
|
||||
type StdType byte |
||||
|
||||
const ( |
||||
// Stdin represents standard input stream type.
|
||||
Stdin StdType = iota |
||||
// Stdout represents standard output stream type.
|
||||
Stdout |
||||
// Stderr represents standard error steam type.
|
||||
Stderr |
||||
// Systemerr represents errors originating from the system that make it
|
||||
// into the multiplexed stream.
|
||||
Systemerr |
||||
|
||||
stdWriterPrefixLen = 8 |
||||
stdWriterFdIndex = 0 |
||||
stdWriterSizeIndex = 4 |
||||
|
||||
startingBufLen = 32*1024 + stdWriterPrefixLen + 1 |
||||
) |
||||
|
||||
var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }} |
||||
|
||||
// stdWriter is wrapper of io.Writer with extra customized info.
|
||||
type stdWriter struct { |
||||
io.Writer |
||||
prefix byte |
||||
} |
||||
|
||||
// Write sends the buffer to the underneath writer.
|
||||
// It inserts the prefix header before the buffer,
|
||||
// so stdcopy.StdCopy knows where to multiplex the output.
|
||||
// It makes stdWriter to implement io.Writer.
|
||||
func (w *stdWriter) Write(p []byte) (n int, err error) { |
||||
if w == nil || w.Writer == nil { |
||||
return 0, errors.New("Writer not instantiated") |
||||
} |
||||
if p == nil { |
||||
return 0, nil |
||||
} |
||||
|
||||
header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} |
||||
binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) |
||||
buf := bufPool.Get().(*bytes.Buffer) |
||||
buf.Write(header[:]) |
||||
buf.Write(p) |
||||
|
||||
n, err = w.Writer.Write(buf.Bytes()) |
||||
n -= stdWriterPrefixLen |
||||
if n < 0 { |
||||
n = 0 |
||||
} |
||||
|
||||
buf.Reset() |
||||
bufPool.Put(buf) |
||||
return |
||||
} |
||||
|
||||
// NewStdWriter instantiates a new Writer.
|
||||
// Everything written to it will be encapsulated using a custom format,
|
||||
// and written to the underlying `w` stream.
|
||||
// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection.
|
||||
// `t` indicates the id of the stream to encapsulate.
|
||||
// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr.
|
||||
func NewStdWriter(w io.Writer, t StdType) io.Writer { |
||||
return &stdWriter{ |
||||
Writer: w, |
||||
prefix: byte(t), |
||||
} |
||||
} |
||||
|
||||
// StdCopy is a modified version of io.Copy.
|
||||
//
|
||||
// StdCopy will demultiplex `src`, assuming that it contains two streams,
|
||||
// previously multiplexed together using a StdWriter instance.
|
||||
// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`.
|
||||
//
|
||||
// StdCopy will read until it hits EOF on `src`. It will then return a nil error.
|
||||
// In other words: if `err` is non nil, it indicates a real underlying error.
|
||||
//
|
||||
// `written` will hold the total number of bytes written to `dstout` and `dsterr`.
|
||||
func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { |
||||
var ( |
||||
buf = make([]byte, startingBufLen) |
||||
bufLen = len(buf) |
||||
nr, nw int |
||||
er, ew error |
||||
out io.Writer |
||||
frameSize int |
||||
) |
||||
|
||||
for { |
||||
// Make sure we have at least a full header
|
||||
for nr < stdWriterPrefixLen { |
||||
var nr2 int |
||||
nr2, er = src.Read(buf[nr:]) |
||||
nr += nr2 |
||||
if er == io.EOF { |
||||
if nr < stdWriterPrefixLen { |
||||
return written, nil |
||||
} |
||||
break |
||||
} |
||||
if er != nil { |
||||
return 0, er |
||||
} |
||||
} |
||||
|
||||
stream := StdType(buf[stdWriterFdIndex]) |
||||
// Check the first byte to know where to write
|
||||
switch stream { |
||||
case Stdin: |
||||
fallthrough |
||||
case Stdout: |
||||
// Write on stdout
|
||||
out = dstout |
||||
case Stderr: |
||||
// Write on stderr
|
||||
out = dsterr |
||||
case Systemerr: |
||||
// If we're on Systemerr, we won't write anywhere.
|
||||
// NB: if this code changes later, make sure you don't try to write
|
||||
// to outstream if Systemerr is the stream
|
||||
out = nil |
||||
default: |
||||
return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) |
||||
} |
||||
|
||||
// Retrieve the size of the frame
|
||||
frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) |
||||
|
||||
// Check if the buffer is big enough to read the frame.
|
||||
// Extend it if necessary.
|
||||
if frameSize+stdWriterPrefixLen > bufLen { |
||||
buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) |
||||
bufLen = len(buf) |
||||
} |
||||
|
||||
// While the amount of bytes read is less than the size of the frame + header, we keep reading
|
||||
for nr < frameSize+stdWriterPrefixLen { |
||||
var nr2 int |
||||
nr2, er = src.Read(buf[nr:]) |
||||
nr += nr2 |
||||
if er == io.EOF { |
||||
if nr < frameSize+stdWriterPrefixLen { |
||||
return written, nil |
||||
} |
||||
break |
||||
} |
||||
if er != nil { |
||||
return 0, er |
||||
} |
||||
} |
||||
|
||||
// we might have an error from the source mixed up in our multiplexed
|
||||
// stream. if we do, return it.
|
||||
if stream == Systemerr { |
||||
return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) |
||||
} |
||||
|
||||
// Write the retrieved frame (without header)
|
||||
nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) |
||||
if ew != nil { |
||||
return 0, ew |
||||
} |
||||
|
||||
// If the frame has not been fully written: error
|
||||
if nw != frameSize { |
||||
return 0, io.ErrShortWrite |
||||
} |
||||
written += int64(nw) |
||||
|
||||
// Move the rest of the buffer to the beginning
|
||||
copy(buf, buf[frameSize+stdWriterPrefixLen:]) |
||||
// Move the index
|
||||
nr -= frameSize + stdWriterPrefixLen |
||||
} |
||||
} |
||||
Loading…
Reference in new issue