feat: Optionally require writes to kafka on Push requests (#14186)

pull/14217/head
benclive 8 months ago committed by GitHub
parent dc0cbd42dc
commit 7c78232ad3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      docs/sources/shared/configuration.md
  2. 186
      pkg/distributor/distributor.go
  3. 125
      pkg/distributor/distributor_test.go
  4. 209
      pkg/ingester-kafka/kafka/kafka_tee.go
  5. 174
      pkg/kafka/tee/tee.go
  6. 50
      pkg/kafka/tee/tee_test.go
  7. 3
      pkg/loki/loki.go
  8. 3
      pkg/loki/modules.go

@ -2264,6 +2264,14 @@ otlp_config:
# List of default otlp resource attributes to be picked as index labels
# CLI flag: -distributor.otlp.default_resource_attributes_as_index_labels
[default_resource_attributes_as_index_labels: <list of strings> | default = [service.name service.namespace service.instance.id deployment.environment cloud.region cloud.availability_zone k8s.cluster.name k8s.namespace.name k8s.pod.name k8s.container.name container.name k8s.replicaset.name k8s.deployment.name k8s.statefulset.name k8s.daemonset.name k8s.cronjob.name k8s.job.name]]
# Enable writes to Kafka during Push requests.
# CLI flag: -distributor.kafka-writes-enabled
[kafka_writes_enabled: <boolean> | default = false]
# Enable writes to Ingesters during Push requests. Defaults to true.
# CLI flag: -distributor.ingester-writes-enabled
[ingester_writes_enabled: <boolean> | default = true]
```
### etcd

@ -19,6 +19,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/prometheus/prometheus/model/labels"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc/codes"
@ -44,6 +45,7 @@ import (
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
@ -88,6 +90,10 @@ type Config struct {
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
KafkaConfig kafka.Config `yaml:"-"`
}
// RegisterFlags registers distributor-related flags.
@ -96,6 +102,16 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
}
func (cfg *Config) Validate() error {
if !cfg.KafkaEnabled && !cfg.IngesterEnabled {
return fmt.Errorf("at least one of kafka and ingestor writes must be enabled")
}
return nil
}
// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
@ -103,6 +119,11 @@ type RateStore interface {
RateFor(tenantID string, streamHash uint64) (int64, float64)
}
type KafkaProducer interface {
ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
Close()
}
// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service
@ -146,6 +167,16 @@ type Distributor struct {
streamShardCount prometheus.Counter
usageTracker push.UsageTracker
// kafka
kafkaWriter KafkaProducer
partitionRing ring.PartitionRingReader
// kafka metrics
kafkaAppends *prometheus.CounterVec
kafkaWriteBytesTotal prometheus.Counter
kafkaWriteLatency prometheus.Histogram
kafkaRecordsPerRequest prometheus.Histogram
}
// New a distributor creates.
@ -154,6 +185,7 @@ func New(
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
partitionRing ring.PartitionRingReader,
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
@ -192,6 +224,20 @@ func New(
return nil, err
}
if partitionRing == nil && cfg.KafkaEnabled {
return nil, fmt.Errorf("partition ring is required for kafka writes")
}
var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka.NewWriterClient(cfg.KafkaConfig, 20, logger, registerer)
if err != nil {
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
kafkaWriter = kafka.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_", registerer))
}
d := &Distributor{
cfg: cfg,
logger: logger,
@ -227,7 +273,30 @@ func New(
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "kafka_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
kafkaWriteLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_latency_seconds",
Help: "Latency to write an incoming request to the ingest storage.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
Buckets: prometheus.DefBuckets,
}),
kafkaWriteBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "kafka_sent_bytes_total",
Help: "Total number of bytes sent to the ingest storage.",
}),
kafkaRecordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_records_per_write_request",
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
}
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
@ -294,6 +363,9 @@ func (d *Distributor) running(ctx context.Context) error {
}
func (d *Distributor) stopping(_ error) error {
if d.kafkaWriter != nil {
d.kafkaWriter.Close()
}
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}
@ -319,6 +391,21 @@ type pushTracker struct {
err chan error
}
// doneWithResult records the result of a stream push.
// If err is nil, the stream push is considered successful.
// If err is not nil, the stream push is considered failed.
func (p *pushTracker) doneWithResult(err error) {
if err == nil {
if p.streamsPending.Dec() == 0 {
p.done <- struct{}{}
}
} else {
if p.streamsFailed.Inc() == 1 {
p.err <- err
}
}
}
// Push a set of streams.
// The returned error is the last one seen.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
@ -488,6 +575,26 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc
tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
streamsToWrite := 0
if d.cfg.IngesterEnabled {
streamsToWrite += len(streams)
}
if d.cfg.KafkaEnabled {
streamsToWrite += len(streams)
}
// We must correctly set streamsPending before beginning any writes to ensure we don't have a race between finishing all of one path before starting the other.
tracker.streamsPending.Store(int32(streamsToWrite))
if d.cfg.KafkaEnabled {
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker)
}
if d.cfg.IngesterEnabled {
streamTrackers := make([]streamTracker, len(streams))
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
@ -522,11 +629,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, err
}
tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.streamsPending.Store(int32(len(streams)))
for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
@ -539,6 +641,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.sendStreams(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], streams)
}
}
select {
case err := <-tracker.err:
return nil, err
@ -744,16 +848,12 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if pushTracker.streamsFailed.Inc() == 1 {
pushTracker.err <- err
}
pushTracker.doneWithResult(err)
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if pushTracker.streamsPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
pushTracker.doneWithResult(nil)
}
}
}
@ -785,6 +885,70 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
return err
}
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker) {
for _, s := range streams {
go func(s KeyedStream) {
err := d.sendStreamToKafka(ctx, s, tenant)
if err != nil {
err = fmt.Errorf("failed to write stream to kafka: %w", err)
}
tracker.doneWithResult(err)
}(s)
}
}
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string) error {
if len(stream.Stream.Entries) == 0 {
return nil
}
/* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
if err != nil {
d.kafkaAppends.WithLabelValues("kafka", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
}*/
partitionID := int32(0)
startTime := time.Now()
records, err := kafka.Encode(partitionID, tenant, stream.Stream, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes)
if err != nil {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
return fmt.Errorf("failed to marshal write request to records: %w", err)
}
d.kafkaRecordsPerRequest.Observe(float64(len(records)))
produceResults := d.kafkaWriter.ProduceSync(ctx, records)
if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 {
d.kafkaWriteLatency.Observe(time.Since(startTime).Seconds())
d.kafkaWriteBytesTotal.Add(float64(sizeBytes))
}
var finalErr error
for _, result := range produceResults {
if result.Err != nil {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
finalErr = result.Err
} else {
d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
}
}
return finalErr
}
func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) {
for _, res := range results {
if res.Err == nil && res.Record != nil {
count++
sizeBytes += len(res.Record.Value)
}
}
return
}
type labelData struct {
ls labels.Labels
hash uint64

@ -26,6 +26,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
@ -505,6 +506,76 @@ func TestDistributorPushErrors(t *testing.T) {
})
}
func TestDistributorPushToKafka(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
t.Run("with kafka, any failure fails the request", func(t *testing.T) {
kafkaWriter := &mockKafkaWriter{
failOnWrite: true,
}
distributors, _ := prepare(t, 1, 0, limits, nil)
for _, d := range distributors {
d.cfg.KafkaEnabled = true
d.cfg.IngesterEnabled = false
d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000
d.kafkaWriter = kafkaWriter
}
request := makeWriteRequest(10, 64)
_, err := distributors[0].Push(ctx, request)
require.Error(t, err)
})
t.Run("with kafka, no failures is successful", func(t *testing.T) {
kafkaWriter := &mockKafkaWriter{
failOnWrite: false,
}
distributors, _ := prepare(t, 1, 0, limits, nil)
for _, d := range distributors {
d.cfg.KafkaEnabled = true
d.cfg.IngesterEnabled = false
d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000
d.kafkaWriter = kafkaWriter
}
request := makeWriteRequest(10, 64)
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
require.Equal(t, 1, kafkaWriter.pushed)
})
t.Run("with kafka and ingesters, both must complete", func(t *testing.T) {
kafkaWriter := &mockKafkaWriter{
failOnWrite: false,
}
distributors, ingesters := prepare(t, 1, 3, limits, nil)
ingesters[0].succeedAfter = 5 * time.Millisecond
ingesters[1].succeedAfter = 10 * time.Millisecond
ingesters[2].succeedAfter = 15 * time.Millisecond
for _, d := range distributors {
d.cfg.KafkaEnabled = true
d.cfg.IngesterEnabled = true
d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes = 1000
d.kafkaWriter = kafkaWriter
}
request := makeWriteRequest(10, 64)
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
require.Equal(t, 1, kafkaWriter.pushed)
require.Equal(t, 1, len(ingesters[0].pushed))
require.Equal(t, 1, len(ingesters[1].pushed))
require.Eventually(t, func() bool {
return len(ingesters[2].pushed) == 1
}, time.Second, 10*time.Millisecond)
})
}
func Test_SortLabelsOnPush(t *testing.T) {
t.Run("with service_name already present in labels", func(t *testing.T) {
limits := &validation.Limits{}
@ -1270,9 +1341,26 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing))
test.Poll(t, time.Second, numIngesters, func() interface{} {
return ingestersRing.InstancesCount()
partitionRing := ring.NewPartitionRing(ring.PartitionRingDesc{
Partitions: map[int32]ring.PartitionDesc{
1: {
Id: 1,
Tokens: []uint32{1},
State: ring.PartitionActive,
StateTimestamp: time.Now().Unix(),
},
},
Owners: map[string]ring.OwnerDesc{
"test": {
OwnedPartition: 1,
State: ring.OwnerActive,
UpdatedTimestamp: time.Now().Unix(),
},
},
})
partitionRingReader := mockPartitionRingReader{
ring: partitionRing,
}
loopbackName, err := loki_net.LoopbackInterfaceName()
require.NoError(t, err)
@ -1299,7 +1387,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
@ -1373,6 +1461,37 @@ func makeWriteRequest(lines, size int) *logproto.PushRequest {
return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`})
}
type mockKafkaWriter struct {
failOnWrite bool
pushed int
}
func (m *mockKafkaWriter) ProduceSync(_ context.Context, _ []*kgo.Record) kgo.ProduceResults {
if m.failOnWrite {
return kgo.ProduceResults{
{
Err: kgo.ErrRecordTimeout,
},
}
}
m.pushed++
return kgo.ProduceResults{
{
Err: nil,
},
}
}
func (m *mockKafkaWriter) Close() {}
type mockPartitionRingReader struct {
ring *ring.PartitionRing
}
func (m mockPartitionRingReader) PartitionRing() *ring.PartitionRing {
return m.ring
}
type mockIngester struct {
grpc_health_v1.HealthClient
logproto.PusherClient

@ -1,209 +0,0 @@
package kafka
import (
"context"
"errors"
"flag"
"fmt"
"math"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/logproto"
)
const writeTimeout = time.Minute
type Config struct {
Address string `yaml:"address" docs:"the kafka endpoint to connect to"`
Topic string `yaml:"topic" docs:"the kafka topic to write to"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+"address", "localhost:9092", "the kafka endpoint to connect to")
f.StringVar(&cfg.Topic, prefix+".topic", "loki.push", "The Kafka topic name.")
}
type Tee struct {
logger log.Logger
kafkaClient *kgo.Client
partitionRing *ring.PartitionInstanceRing
ingesterAppends *prometheus.CounterVec
}
func NewTee(
cfg Config,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
partitionRing *ring.PartitionInstanceRing,
) (*Tee, error) {
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)
metrics := kprom.NewMetrics(
"", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
kprom.Registerer(registerer),
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
opts := append([]kgo.Opt{},
kgo.SeedBrokers(cfg.Address),
kgo.WithHooks(metrics),
// commonKafkaClientOptions(kafkaCfg, metrics, logger),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.DefaultProduceTopic(cfg.Topic),
kgo.AllowAutoTopicCreation(),
// We set the partition field in each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
// Set the upper bounds the size of a record batch.
kgo.ProducerBatchMaxBytes(1024*1024*1),
// By default, the Kafka client allows 1 Produce in-flight request per broker. Disabling write idempotency
// (which we don't need), we can increase the max number of in-flight Produce requests per broker. A higher
// number of in-flight requests, in addition to short buffering ("linger") in client side before firing the
// next Produce request allows us to reduce the end-to-end latency.
//
// The result of the multiplication of producer linger and max in-flight requests should match the maximum
// Produce latency expected by the Kafka backend in a steady state. For example, 50ms * 20 requests = 1s,
// which means the Kafka client will keep issuing a Produce request every 50ms as far as the Kafka backend
// doesn't take longer than 1s to process them (if it takes longer, the client will buffer data and stop
// issuing new Produce requests until some previous ones complete).
kgo.DisableIdempotentWrite(),
kgo.ProducerLinger(50*time.Millisecond),
kgo.MaxProduceRequestsInflightPerBroker(20),
// Unlimited number of Produce retries but a deadline on the max time a record can take to be delivered.
// With the default config it would retry infinitely.
//
// Details of the involved timeouts:
// - RecordDeliveryTimeout: how long a Kafka client Produce() call can take for a given record. The overhead
// timeout is NOT applied.
// - ProduceRequestTimeout: how long to wait for the response to the Produce request (the Kafka protocol message)
// after being sent on the network. The actual timeout is increased by the configured overhead.
//
// When a Produce request to Kafka fail, the client will retry up until the RecordDeliveryTimeout is reached.
// Once the timeout is reached, the Produce request will fail and all other buffered requests in the client
// (for the same partition) will fail too. See kgo.RecordDeliveryTimeout() documentation for more info.
kgo.RecordRetries(math.MaxInt),
kgo.RecordDeliveryTimeout(time.Minute),
kgo.ProduceRequestTimeout(time.Minute),
kgo.RequestTimeoutOverhead(time.Minute),
// Unlimited number of buffered records because we limit on bytes in Writer. The reason why we don't use
// kgo.MaxBufferedBytes() is because it suffers a deadlock issue:
// https://github.com/twmb/franz-go/issues/777
kgo.MaxBufferedRecords(math.MaxInt), // Use a high value to set it as unlimited, because the client doesn't support "0 as unlimited".
kgo.MaxBufferedBytes(0),
)
kafkaClient, err := kgo.NewClient(opts...)
if err != nil {
panic("failed to start kafka client")
}
t := &Tee{
logger: log.With(logger, "component", "kafka-tee"),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "kafka_ingester_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
kafkaClient: kafkaClient,
partitionRing: partitionRing,
}
return t, nil
}
// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
for idx := range streams {
go func(stream distributor.KeyedStream) {
if err := t.sendStream(tenant, stream); err != nil {
level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err)
}
}(streams[idx])
}
}
func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
if err != nil {
t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
}
records, err := marshalWriteRequestToRecords(partitionID, tenant, stream.Stream, 1024*1024)
ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout)
defer cancel()
produceResults := t.kafkaClient.ProduceSync(ctx, records...)
var finalErr error
for _, result := range produceResults {
if result.Err != nil {
t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
finalErr = err
} else {
t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
}
}
return finalErr
}
// marshalWriteRequestToRecords marshals a mimirpb.WriteRequest to one or more Kafka records.
// The request may be split to multiple records to get that each single Kafka record
// data size is not bigger than maxSize.
//
// This function is a best-effort. The returned Kafka records are not strictly guaranteed to
// have their data size limited to maxSize. The reason is that the WriteRequest is split
// by each individual Timeseries and Metadata: if a single Timeseries or Metadata is bigger than
// maxSize, than the resulting record will be bigger than the limit as well.
func marshalWriteRequestToRecords(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) {
reqSize := stream.Size()
if reqSize <= maxSize {
// No need to split the request. We can take a fast path.
rec, err := marshalWriteRequestToRecord(partitionID, tenantID, stream, reqSize)
if err != nil {
return nil, err
}
return []*kgo.Record{rec}, nil
}
return nil, errors.New("large write requests are not supported yet")
// return marshalWriteRequestsToRecords(partitionID, tenantID, mimirpb.SplitWriteRequestByMaxMarshalSize(req, reqSize, maxSize))
}
func marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream, reqSize int) (*kgo.Record, error) {
// Marshal the request.
data := make([]byte, reqSize)
n, err := stream.MarshalToSizedBuffer(data)
if err != nil {
return nil, fmt.Errorf("failed to serialise write request: %w", err)
}
data = data[:n]
return &kgo.Record{
Key: []byte(tenantID), // We don't partition based on the key, so the value here doesn't make any difference.
Value: data,
Partition: partitionID,
}, nil
}

@ -1,174 +0,0 @@
package tee
import (
"context"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/kafka"
)
const writeTimeout = time.Minute
// Tee represents a component that duplicates log streams to Kafka.
type Tee struct {
logger log.Logger
producer *kafka.Producer
partitionRing ring.PartitionRingReader
cfg kafka.Config
ingesterAppends *prometheus.CounterVec
writeLatency prometheus.Histogram
writeBytesTotal prometheus.Counter
recordsPerRequest prometheus.Histogram
}
// NewTee creates and initializes a new Tee instance.
//
// Parameters:
// - cfg: Kafka configuration
// - metricsNamespace: Namespace for Prometheus metrics
// - registerer: Prometheus registerer for metrics
// - logger: Logger instance
// - partitionRing: Ring for managing partitions
//
// Returns:
// - A new Tee instance and any error encountered during initialization
func NewTee(
cfg kafka.Config,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
partitionRing ring.PartitionRingReader,
) (*Tee, error) {
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)
kafkaClient, err := kafka.NewWriterClient(cfg, 20, logger, registerer)
if err != nil {
return nil, fmt.Errorf("failed to start kafka client: %w", err)
}
producer := kafka.NewProducer(kafkaClient, cfg.ProducerMaxBufferedBytes,
prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer))
t := &Tee{
logger: log.With(logger, "component", "kafka-tee"),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "kafka_ingester_appends_total",
Help: "The total number of appends sent to kafka ingest path.",
}, []string{"partition", "status"}),
producer: producer,
partitionRing: partitionRing,
cfg: cfg,
// Metrics.
writeLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_ingester_tee_latency_seconds",
Help: "Latency to write an incoming request to the ingest storage.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
Buckets: prometheus.DefBuckets,
}),
writeBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "kafka_ingester_tee_sent_bytes_total",
Help: "Total number of bytes sent to the ingest storage.",
}),
recordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "kafka_ingester_tee_records_per_write_request",
Help: "The number of records a single per-partition write request has been split into.",
Buckets: prometheus.ExponentialBuckets(1, 2, 8),
}),
}
return t, nil
}
// Duplicate implements the distributor.Tee interface, which is used to duplicate
// distributor requests to pattern ingesters. It asynchronously sends each stream
// to Kafka.
//
// Parameters:
// - tenant: The tenant identifier
// - streams: A slice of KeyedStream to be duplicated
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) {
for idx := range streams {
go func(stream distributor.KeyedStream) {
if err := t.sendStream(tenant, stream); err != nil {
level.Error(t.logger).Log("msg", "failed to send stream to kafka", "err", err)
}
}(streams[idx])
}
}
func (t *Tee) Close() {
t.producer.Close()
}
// sendStream sends a single stream to Kafka.
//
// Parameters:
// - tenant: The tenant identifier
// - stream: The KeyedStream to be sent
//
// Returns:
// - An error if the stream couldn't be sent successfully
func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error {
if len(stream.Stream.Entries) == 0 {
return nil
}
partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey)
if err != nil {
t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc()
return fmt.Errorf("failed to find active partition for stream: %w", err)
}
startTime := time.Now()
records, err := kafka.Encode(partitionID, tenant, stream.Stream, t.cfg.ProducerMaxRecordSizeBytes)
if err != nil {
t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
return fmt.Errorf("failed to marshal write request to records: %w", err)
}
t.recordsPerRequest.Observe(float64(len(records)))
ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout)
defer cancel()
produceResults := t.producer.ProduceSync(ctx, records)
if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 {
t.writeLatency.Observe(time.Since(startTime).Seconds())
t.writeBytesTotal.Add(float64(sizeBytes))
}
var finalErr error
for _, result := range produceResults {
if result.Err != nil {
t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc()
finalErr = err
} else {
t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc()
}
}
return finalErr
}
func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) {
for _, res := range results {
if res.Err == nil && res.Record != nil {
count++
sizeBytes += len(res.Record.Value)
}
}
return
}

@ -1,50 +0,0 @@
package tee
import (
"os"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
"github.com/grafana/loki/pkg/push"
)
func TestPushKafkaRecords(t *testing.T) {
_, cfg := testkafka.CreateCluster(t, 1, "topic")
tee, err := NewTee(cfg, "test", prometheus.NewRegistry(), log.NewLogfmtLogger(os.Stdout), newTestPartitionRing())
require.NoError(t, err)
err = tee.sendStream("test", distributor.KeyedStream{
HashKey: 1,
Stream: push.Stream{
Labels: `{foo="bar"}`,
Entries: []push.Entry{
{Timestamp: time.Now(), Line: "test"},
},
},
})
require.NoError(t, err)
}
type testPartitionRing struct {
partitionRing *ring.PartitionRing
}
func (t *testPartitionRing) PartitionRing() *ring.PartitionRing {
return t.partitionRing
}
func newTestPartitionRing() ring.PartitionRingReader {
desc := ring.NewPartitionRingDesc()
desc.AddPartition(0, ring.PartitionActive, time.Now())
return &testPartitionRing{
partitionRing: ring.NewPartitionRing(*desc),
}
}

@ -300,6 +300,9 @@ func (c *Config) Validate() error {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config"))
}
}
if err := c.Distributor.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid distributor config"))
}
errs = append(errs, validateSchemaValues(c)...)
errs = append(errs, ValidateConfigCompatibility(*c)...)

@ -321,6 +321,8 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
}
func (t *Loki) initDistributor() (services.Service, error) {
t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig
var err error
logger := log.With(util_log.Logger, "component", "distributor")
t.distributor, err = distributor.New(
@ -328,6 +330,7 @@ func (t *Loki) initDistributor() (services.Service, error) {
t.Cfg.IngesterClient,
t.tenantConfigs,
t.ring,
t.partitionRing,
t.Overrides,
prometheus.DefaultRegisterer,
t.Cfg.MetricsNamespace,

Loading…
Cancel
Save