Expose Kafka message key in labels (#4745)

* expose kafka message key in labels

* add changelog entry

* improve performance

* goimported

* fix label name in test

* add empty message key test case

* pass actual value in message key
pull/4772/head^2
Hiroki Sakamoto 4 years ago committed by GitHub
parent 4c9e4f5f06
commit d61dd1872a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 25
      clients/pkg/promtail/targets/kafka/formatter.go
  3. 30
      clients/pkg/promtail/targets/kafka/target.go
  4. 20
      clients/pkg/promtail/targets/kafka/target_syncer.go
  5. 108
      clients/pkg/promtail/targets/kafka/target_test.go
  6. 5
      docs/sources/clients/promtail/configuration.md
  7. 4
      docs/sources/clients/promtail/scraping.md

@ -1,5 +1,7 @@
## Main
* [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail
* [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.
* [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention

@ -0,0 +1,25 @@
package kafka
import (
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/pkg/util"
)
func format(lbs labels.Labels, cfg []*relabel.Config) model.LabelSet {
if len(lbs) == 0 {
return nil
}
processed := relabel.Process(lbs, cfg...)
labelOut := model.LabelSet(util.LabelsToMetric(processed))
for k := range labelOut {
if strings.HasPrefix(string(k), "__") {
delete(labelOut, k)
}
}
return labelOut
}

@ -4,6 +4,9 @@ import (
"fmt"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/Shopify/sarama"
"github.com/prometheus/common/model"
@ -29,6 +32,7 @@ type Target struct {
claim sarama.ConsumerGroupClaim
session sarama.ConsumerGroupSession
client api.EntryHandler
relabelConfig []*relabel.Config
useIncomingTimestamp bool
}
@ -36,6 +40,7 @@ func NewTarget(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
discoveredLabels, lbs model.LabelSet,
relabelConfig []*relabel.Config,
client api.EntryHandler,
useIncomingTimestamp bool,
) *Target {
@ -46,20 +51,41 @@ func NewTarget(
claim: claim,
session: session,
client: client,
relabelConfig: relabelConfig,
useIncomingTimestamp: useIncomingTimestamp,
}
}
const (
defaultKafkaMessageKey = "none"
labelKeyKafkaMessageKey = "__meta_kafka_message_key"
)
func (t *Target) run() {
defer t.client.Stop()
for message := range t.claim.Messages() {
mk := string(message.Key)
if len(mk) == 0 {
mk = defaultKafkaMessageKey
}
// TODO: Possibly need to format after merging with discovered labels because we can specify multiple labels in source labels
// https://github.com/grafana/loki/pull/4745#discussion_r750022234
lbs := format([]labels.Label{{
Name: labelKeyKafkaMessageKey,
Value: mk,
}}, t.relabelConfig)
out := t.lbs.Clone()
if len(lbs) > 0 {
out = out.Merge(lbs)
}
t.client.Chan() <- api.Entry{
Entry: logproto.Entry{
Line: string(message.Value),
Timestamp: timestamp(t.useIncomingTimestamp, message.Timestamp),
},
Labels: t.lbs.Clone(),
Labels: out,
}
t.session.MarkMessage(message, "")
}

@ -4,22 +4,19 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/util"
)
@ -267,18 +264,12 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar
"__meta_kafka_member_id": model.LabelValue(session.MemberID()),
"__meta_kafka_group_id": model.LabelValue(ts.cfg.KafkaConfig.GroupID),
}
details := newDetails(session, claim)
labelMap := make(map[string]string)
for k, v := range discoveredLabels.Clone().Merge(ts.cfg.KafkaConfig.Labels) {
labelMap[string(k)] = string(v)
}
lbs := relabel.Process(labels.FromMap(labelMap), ts.cfg.RelabelConfigs...)
details := newDetails(session, claim)
labelOut := model.LabelSet(util.LabelsToMetric(lbs))
for k := range labelOut {
if strings.HasPrefix(string(k), "__") {
delete(labelOut, k)
}
}
labelOut := format(labels.FromMap(labelMap), ts.cfg.RelabelConfigs)
if len(labelOut) == 0 {
level.Warn(ts.logger).Log("msg", "dropping target", "reason", "no labels", "details", details, "discovered_labels", discoveredLabels.String())
return &runnableDroppedTarget{
@ -300,6 +291,7 @@ func (ts *TargetSyncer) NewTarget(session sarama.ConsumerGroupSession, claim sar
claim,
discoveredLabels,
labelOut,
ts.cfg.RelabelConfigs,
pipeline.Wrap(ts.client),
ts.cfg.KafkaConfig.UseIncomingTimestamp,
)

@ -10,6 +10,7 @@ import (
"github.com/Shopify/sarama"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
@ -91,31 +92,90 @@ func (t *testClaim) Stop() {
}
func Test_TargetRun(t *testing.T) {
session, claim := &testSession{}, newTestClaim("footopic", 10, 12)
var closed bool
fc := fake.New(
func() {
closed = true
tc := []struct {
name string
inMessageKey string
inLS model.LabelSet
inDiscoveredLS model.LabelSet
relabels []*relabel.Config
expectedLS model.LabelSet
}{
{
name: "no relabel config",
inMessageKey: "foo",
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
inLS: model.LabelSet{"buzz": "bazz"},
relabels: nil,
expectedLS: model.LabelSet{"buzz": "bazz"},
},
)
tg := NewTarget(session, claim, model.LabelSet{"foo": "bar"}, model.LabelSet{"buzz": "bazz"}, fc, true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tg.run()
}()
for i := 0; i < 10; i++ {
claim.Send(&sarama.ConsumerMessage{
Timestamp: time.Unix(0, int64(i)),
Value: []byte(fmt.Sprintf("%d", i)),
{
name: "message key with relabel config",
inMessageKey: "foo",
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
inLS: model.LabelSet{"buzz": "bazz"},
relabels: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__meta_kafka_message_key"},
Regex: relabel.MustNewRegexp("(.*)"),
TargetLabel: "message_key",
Replacement: "$1",
Action: "replace",
},
},
expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "foo"},
},
{
name: "no message key with relabel config",
inMessageKey: "",
inDiscoveredLS: model.LabelSet{"__meta_kafka_foo": "bar"},
inLS: model.LabelSet{"buzz": "bazz"},
relabels: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__meta_kafka_message_key"},
Regex: relabel.MustNewRegexp("(.*)"),
TargetLabel: "message_key",
Replacement: "$1",
Action: "replace",
},
},
expectedLS: model.LabelSet{"buzz": "bazz", "message_key": "none"},
},
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
session, claim := &testSession{}, newTestClaim("footopic", 10, 12)
var closed bool
fc := fake.New(
func() {
closed = true
},
)
tg := NewTarget(session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
tg.run()
}()
for i := 0; i < 10; i++ {
claim.Send(&sarama.ConsumerMessage{
Timestamp: time.Unix(0, int64(i)),
Value: []byte(fmt.Sprintf("%d", i)),
Key: []byte(tt.inMessageKey),
})
}
claim.Stop()
wg.Wait()
re := fc.Received()
require.Len(t, session.markedMessage, 10)
require.Len(t, re, 10)
require.True(t, closed)
for _, e := range re {
require.Equal(t, tt.expectedLS.String(), e.Labels.String())
}
})
}
claim.Stop()
wg.Wait()
require.Len(t, session.markedMessage, 10)
require.Len(t, fc.Received(), 10)
require.True(t, closed)
}

@ -981,8 +981,9 @@ The list of labels below are discovered when consuming kafka:
- `__meta_kafka_topic`: The current topic for where the message has been read.
- `__meta_kafka_partition`: The partition id where the message has been read.
- `__meta_kafka_member_id`: the consumer group member id.
- `__meta_kafka_group_id`: the consumer group id.
- `__meta_kafka_member_id`: The consumer group member id.
- `__meta_kafka_group_id`: The consumer group id.
- `__meta_kafka_message_key`: The message key. If it is empty, this value will be 'none'.
To keep discovered labels to your logs use the [relabel_configs](#relabel_configs) section.

@ -304,6 +304,10 @@ scrape_configs:
source_labels:
- __meta_kafka_group_id
target_label: group
- action: replace
source_labels:
- __meta_kafka_message_key
target_label: message_key
```
Only the `brokers` and `topics` is required.

Loading…
Cancel
Save