diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dcdf5df70..26b3759254 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## Main +* [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail +* [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels * [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction * [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support. * [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention diff --git a/clients/pkg/promtail/targets/kafka/formatter.go b/clients/pkg/promtail/targets/kafka/formatter.go new file mode 100644 index 0000000000..c19f6d90da --- /dev/null +++ b/clients/pkg/promtail/targets/kafka/formatter.go @@ -0,0 +1,25 @@ +package kafka + +import ( + "strings" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + + "github.com/grafana/loki/pkg/util" +) + +func format(lbs labels.Labels, cfg []*relabel.Config) model.LabelSet { + if len(lbs) == 0 { + return nil + } + processed := relabel.Process(lbs, cfg...) + labelOut := model.LabelSet(util.LabelsToMetric(processed)) + for k := range labelOut { + if strings.HasPrefix(string(k), "__") { + delete(labelOut, k) + } + } + return labelOut +} diff --git a/clients/pkg/promtail/targets/kafka/target.go b/clients/pkg/promtail/targets/kafka/target.go index eb05a200b3..9f54bd7a88 100644 --- a/clients/pkg/promtail/targets/kafka/target.go +++ b/clients/pkg/promtail/targets/kafka/target.go @@ -4,6 +4,9 @@ import ( "fmt" "time" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/Shopify/sarama" "github.com/prometheus/common/model" @@ -29,6 +32,7 @@ type Target struct { claim sarama.ConsumerGroupClaim session sarama.ConsumerGroupSession client api.EntryHandler + relabelConfig []*relabel.Config useIncomingTimestamp bool } @@ -36,6 +40,7 @@ func NewTarget( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, discoveredLabels, lbs model.LabelSet, + relabelConfig []*relabel.Config, client api.EntryHandler, useIncomingTimestamp bool, ) *Target { @@ -46,20 +51,41 @@ func NewTarget( claim: claim, session: session, client: client, + relabelConfig: relabelConfig, useIncomingTimestamp: useIncomingTimestamp, } } +const ( + defaultKafkaMessageKey = "none" + labelKeyKafkaMessageKey = "__meta_kafka_message_key" +) + func (t *Target) run() { defer t.client.Stop() - for message := range t.claim.Messages() { + mk := string(message.Key) + if len(mk) == 0 { + mk = defaultKafkaMessageKey + } + + // TODO: Possibly need to format after merging with discovered labels because we can specify multiple labels in source labels + // https://github.com/grafana/loki/pull/4745#discussion_r750022234 + lbs := format([]labels.Label{{ + Name: labelKeyKafkaMessageKey, + Value: mk, + }}, t.relabelConfig) + + out := t.lbs.Clone() + if len(lbs) > 0 { + out = out.Merge(lbs) + } t.client.Chan() <- api.Entry{ Entry: logproto.Entry{ Line: string(message.Value), Timestamp: timestamp(t.useIncomingTimestamp, message.Timestamp), }, - Labels: t.lbs.Clone(), + Labels: out, } t.session.MarkMessage(message, "") } diff --git a/clients/pkg/promtail/targets/kafka/target_syncer.go b/clients/pkg/promtail/targets/kafka/target_syncer.go index 55c3f21703..30d6b6bac6 100644 --- a/clients/pkg/promtail/targets/kafka/target_syncer.go +++ b/clients/pkg/promtail/targets/kafka/target_syncer.go @@ -4,22 +4,19 @@ import ( "context" "errors" "fmt" - "strings" "sync" "time" "github.com/Shopify/sarama" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" - "github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/target" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/util" ) @@ -267,18 +264,12 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar "__meta_kafka_member_id": model.LabelValue(session.MemberID()), "__meta_kafka_group_id": model.LabelValue(ts.cfg.KafkaConfig.GroupID), } + details := newDetails(session, claim) labelMap := make(map[string]string) for k, v := range discoveredLabels.Clone().Merge(ts.cfg.KafkaConfig.Labels) { labelMap[string(k)] = string(v) } - lbs := relabel.Process(labels.FromMap(labelMap), ts.cfg.RelabelConfigs...) - details := newDetails(session, claim) - labelOut := model.LabelSet(util.LabelsToMetric(lbs)) - for k := range labelOut { - if strings.HasPrefix(string(k), "__") { - delete(labelOut, k) - } - } + labelOut := format(labels.FromMap(labelMap), ts.cfg.RelabelConfigs) if len(labelOut) == 0 { level.Warn(ts.logger).Log("msg", "dropping target", "reason", "no labels", "details", details, "discovered_labels", discoveredLabels.String()) return &runnableDroppedTarget{ @@ -300,6 +291,7 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar claim, discoveredLabels, labelOut, + ts.cfg.RelabelConfigs, pipeline.Wrap(ts.client), ts.cfg.KafkaConfig.UseIncomingTimestamp, ) diff --git a/clients/pkg/promtail/targets/kafka/target_test.go b/clients/pkg/promtail/targets/kafka/target_test.go index c85f3ae728..ee02cb55e0 100644 --- a/clients/pkg/promtail/targets/kafka/target_test.go +++ b/clients/pkg/promtail/targets/kafka/target_test.go @@ -10,6 +10,7 @@ import ( "github.com/Shopify/sarama" "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/stretchr/testify/require" "go.uber.org/atomic" ) @@ -91,31 +92,90 @@ func (t *testClaim) Stop() { } func Test_TargetRun(t *testing.T) { - session, claim := &testSession{}, newTestClaim("footopic", 10, 12) - var closed bool - fc := fake.New( - func() { - closed = true + tc := []struct { + name string + inMessageKey string + inLS model.LabelSet + inDiscoveredLS model.LabelSet + relabels []*relabel.Config + expectedLS model.LabelSet + }{ + { + name: "no relabel config", + inMessageKey: "foo", + inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"}, + inLS: model.LabelSet{"buzz": "bazz"}, + relabels: nil, + expectedLS: model.LabelSet{"buzz": "bazz"}, }, - ) - tg := NewTarget(session, claim, model.LabelSet{"foo": "bar"}, model.LabelSet{"buzz": "bazz"}, fc, true) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - tg.run() - }() - - for i := 0; i < 10; i++ { - claim.Send(&sarama.ConsumerMessage{ - Timestamp: time.Unix(0, int64(i)), - Value: []byte(fmt.Sprintf("%d", i)), + { + name: "message key with relabel config", + inMessageKey: "foo", + inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"}, + inLS: model.LabelSet{"buzz": "bazz"}, + relabels: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__meta_kafka_message_key"}, + Regex: relabel.MustNewRegexp("(.*)"), + TargetLabel: "message_key", + Replacement: "$1", + Action: "replace", + }, + }, + expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "foo"}, + }, + { + name: "no message key with relabel config", + inMessageKey: "", + inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"}, + inLS: model.LabelSet{"buzz": "bazz"}, + relabels: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__meta_kafka_message_key"}, + Regex: relabel.MustNewRegexp("(.*)"), + TargetLabel: "message_key", + Replacement: "$1", + Action: "replace", + }, + }, + expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "none"}, + }, + } + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + session, claim := &testSession{}, newTestClaim("footopic", 10, 12) + var closed bool + fc := fake.New( + func() { + closed = true + }, + ) + tg := NewTarget(session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + tg.run() + }() + + for i := 0; i < 10; i++ { + claim.Send(&sarama.ConsumerMessage{ + Timestamp: time.Unix(0, int64(i)), + Value: []byte(fmt.Sprintf("%d", i)), + Key: []byte(tt.inMessageKey), + }) + } + claim.Stop() + wg.Wait() + re := fc.Received() + + require.Len(t, session.markedMessage, 10) + require.Len(t, re, 10) + require.True(t, closed) + for _, e := range re { + require.Equal(t, tt.expectedLS.String(), e.Labels.String()) + } }) } - claim.Stop() - wg.Wait() - require.Len(t, session.markedMessage, 10) - require.Len(t, fc.Received(), 10) - require.True(t, closed) } diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index c7a0c6770b..f6ad81a822 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -981,8 +981,9 @@ The list of labels below are discovered when consuming kafka: - `__meta_kafka_topic`: The current topic for where the message has been read. - `__meta_kafka_partition`: The partition id where the message has been read. -- `__meta_kafka_member_id`: the consumer group member id. -- `__meta_kafka_group_id`: the consumer group id. +- `__meta_kafka_member_id`: The consumer group member id. +- `__meta_kafka_group_id`: The consumer group id. +- `__meta_kafka_message_key`: The message key. If it is empty, this value will be 'none'. To keep discovered labels to your logs use the [relabel_configs](#relabel_configs) section. diff --git a/docs/sources/clients/promtail/scraping.md b/docs/sources/clients/promtail/scraping.md index 1ae8b429d0..a872e57434 100644 --- a/docs/sources/clients/promtail/scraping.md +++ b/docs/sources/clients/promtail/scraping.md @@ -304,6 +304,10 @@ scrape_configs: source_labels: - __meta_kafka_group_id target_label: group + - action: replace + source_labels: + - __meta_kafka_message_key + target_label: message_key ``` Only the `brokers` and `topics` is required.