From beca6f33662e8a43ea59943a4327a1c328960058 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 26 Sep 2024 16:28:22 +0200 Subject: [PATCH] feat(kafka): enqueue commit offset only once per batch process (#14278) Co-authored-by: George Robinson --- pkg/ingester/kafka_consumer.go | 69 +++++++++++++++++++++++------ pkg/ingester/kafka_consumer_test.go | 2 + pkg/kafka/ingester/consumer_test.go | 11 +++-- pkg/kafka/partition/committer.go | 3 +- pkg/kafka/partition/reader.go | 1 - 5 files changed, 68 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index 52c5ba96a6..c2fe90ee05 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -2,12 +2,14 @@ package ingester import ( "context" + "errors" math "math" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -39,24 +41,26 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory { metrics := newConsumerMetrics(reg) - return func(_ partition.Committer) (partition.Consumer, error) { + return func(committer partition.Committer) (partition.Consumer, error) { decoder, err := kafka.NewDecoder() if err != nil { return nil, err } return &kafkaConsumer{ - pusher: pusher, - logger: logger, - decoder: decoder, - metrics: metrics, + pusher: pusher, + logger: logger, + decoder: decoder, + metrics: metrics, + committer: committer, }, nil } } type kafkaConsumer struct { - pusher logproto.PusherServer - logger log.Logger - decoder *kafka.Decoder + pusher logproto.PusherServer + logger log.Logger + decoder *kafka.Decoder + committer partition.Committer metrics *consumerMetrics } @@ -72,14 +76,14 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti level.Info(kc.logger).Log("msg", "shutting down kafka consumer") return case records := <-recordsChan: - kc.consume(records) + kc.consume(ctx, records) } } }() return wg.Wait } -func (kc *kafkaConsumer) consume(records []partition.Record) { +func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record) { if len(records) == 0 { return } @@ -101,13 +105,52 @@ func (kc *kafkaConsumer) consume(records []partition.Record) { level.Error(kc.logger).Log("msg", "failed to decode record", "error", err) continue } - ctx := user.InjectOrgID(record.Ctx, record.TenantID) - if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{ + recordCtx := user.InjectOrgID(record.Ctx, record.TenantID) + req := &logproto.PushRequest{ Streams: []logproto.Stream{stream}, + } + if err := retryWithBackoff(ctx, func(attempts int) error { + if _, err := kc.pusher.Push(recordCtx, req); err != nil { + level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts) + return err + } + return nil }); err != nil { - level.Error(kc.logger).Log("msg", "failed to push records", "error", err) + level.Error(kc.logger).Log("msg", "exhausted all retry attempts, failed to push records", "err", err, "offset", record.Offset) } + kc.committer.EnqueueOffset(record.Offset) } kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) kc.metrics.currentOffset.Set(float64(maxOffset)) } + +func canRetry(err error) bool { + return errors.Is(err, ErrReadOnly) +} + +func retryWithBackoff(ctx context.Context, fn func(attempts int) error) error { + err := fn(0) + if err == nil { + return nil + } + if !canRetry(err) { + return err + } + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 5 * time.Second, + MaxRetries: 0, // Retry infinitely + }) + backoff.Wait() + for backoff.Ongoing() { + err = fn(backoff.NumRetries()) + if err == nil { + return nil + } + if !canRetry(err) { + return err + } + backoff.Wait() + } + return backoff.Err() +} diff --git a/pkg/ingester/kafka_consumer_test.go b/pkg/ingester/kafka_consumer_test.go index 7a2ba5887d..f9ac98c513 100644 --- a/pkg/ingester/kafka_consumer_test.go +++ b/pkg/ingester/kafka_consumer_test.go @@ -74,6 +74,8 @@ func (f *fakePusher) Push(ctx context.Context, in *logproto.PushRequest) (*logpr type noopCommitter struct{} +func (nc *noopCommitter) EnqueueOffset(_ int64) {} + func (noopCommitter) Commit(_ context.Context, _ int64) error { return nil } func TestConsumer(t *testing.T) { diff --git a/pkg/kafka/ingester/consumer_test.go b/pkg/kafka/ingester/consumer_test.go index c6e14ddebe..a0baa92ba8 100644 --- a/pkg/kafka/ingester/consumer_test.go +++ b/pkg/kafka/ingester/consumer_test.go @@ -33,6 +33,11 @@ func (m *mockCommitter) Commit(_ context.Context, offset int64) error { return nil } +func (m *mockCommitter) EnqueueOffset(offset int64) { + // For testing purposes, we'll just set the committed offset directly + m.committed = offset +} + func TestConsumer_PeriodicFlush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -46,7 +51,7 @@ func TestConsumer_PeriodicFlush(t *testing.T) { flushInterval := 100 * time.Millisecond maxFlushSize := int64(1000) - committer := &mockCommitter{} + committer := newMockCommitter() consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) consumer, err := consumerFactory(committer) require.NoError(t, err) @@ -99,7 +104,7 @@ func TestConsumer_ShutdownFlush(t *testing.T) { flushInterval := 1 * time.Hour maxFlushSize := int64(1000) - committer := &mockCommitter{} + committer := newMockCommitter() consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) consumer, err := consumerFactory(committer) require.NoError(t, err) @@ -153,7 +158,7 @@ func TestConsumer_MaxFlushSize(t *testing.T) { flushInterval := 1 * time.Hour maxFlushSize := int64(10) - committer := &mockCommitter{} + committer := newMockCommitter() consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) consumer, err := consumerFactory(committer) require.NoError(t, err) diff --git a/pkg/kafka/partition/committer.go b/pkg/kafka/partition/committer.go index c3a1f796e0..f9aeda3f0f 100644 --- a/pkg/kafka/partition/committer.go +++ b/pkg/kafka/partition/committer.go @@ -19,6 +19,7 @@ import ( // Committer defines an interface for committing offsets type Committer interface { Commit(ctx context.Context, offset int64) error + EnqueueOffset(offset int64) } // partitionCommitter is responsible for committing offsets for a specific Kafka partition @@ -113,7 +114,7 @@ func (r *partitionCommitter) autoCommitLoop(ctx context.Context) { } } -func (r *partitionCommitter) enqueueOffset(o int64) { +func (r *partitionCommitter) EnqueueOffset(o int64) { if r.kafkaCfg.ConsumerGroupOffsetCommitInterval > 0 { r.toCommit.Store(o) } diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 9720e059ae..9972d13307 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -123,7 +123,6 @@ func (p *Reader) startFetchLoop(ctx context.Context) <-chan []Record { return default: records <- p.poll(ctx) - p.committer.enqueueOffset(p.lastProcessedOffset) } } }()