|
|
|
@ -2,12 +2,14 @@ package ingester |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"errors" |
|
|
|
|
math "math" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
"github.com/go-kit/log/level" |
|
|
|
|
"github.com/grafana/dskit/backoff" |
|
|
|
|
"github.com/grafana/dskit/user" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto" |
|
|
|
@ -39,24 +41,26 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { |
|
|
|
|
|
|
|
|
|
func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory { |
|
|
|
|
metrics := newConsumerMetrics(reg) |
|
|
|
|
return func(_ partition.Committer) (partition.Consumer, error) { |
|
|
|
|
return func(committer partition.Committer) (partition.Consumer, error) { |
|
|
|
|
decoder, err := kafka.NewDecoder() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return &kafkaConsumer{ |
|
|
|
|
pusher: pusher, |
|
|
|
|
logger: logger, |
|
|
|
|
decoder: decoder, |
|
|
|
|
metrics: metrics, |
|
|
|
|
pusher: pusher, |
|
|
|
|
logger: logger, |
|
|
|
|
decoder: decoder, |
|
|
|
|
metrics: metrics, |
|
|
|
|
committer: committer, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type kafkaConsumer struct { |
|
|
|
|
pusher logproto.PusherServer |
|
|
|
|
logger log.Logger |
|
|
|
|
decoder *kafka.Decoder |
|
|
|
|
pusher logproto.PusherServer |
|
|
|
|
logger log.Logger |
|
|
|
|
decoder *kafka.Decoder |
|
|
|
|
committer partition.Committer |
|
|
|
|
|
|
|
|
|
metrics *consumerMetrics |
|
|
|
|
} |
|
|
|
@ -72,14 +76,14 @@ func (kc *kafkaConsumer) Start(ctx context.Context, recordsChan <-chan []partiti |
|
|
|
|
level.Info(kc.logger).Log("msg", "shutting down kafka consumer") |
|
|
|
|
return |
|
|
|
|
case records := <-recordsChan: |
|
|
|
|
kc.consume(records) |
|
|
|
|
kc.consume(ctx, records) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return wg.Wait |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (kc *kafkaConsumer) consume(records []partition.Record) { |
|
|
|
|
func (kc *kafkaConsumer) consume(ctx context.Context, records []partition.Record) { |
|
|
|
|
if len(records) == 0 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -101,13 +105,52 @@ func (kc *kafkaConsumer) consume(records []partition.Record) { |
|
|
|
|
level.Error(kc.logger).Log("msg", "failed to decode record", "error", err) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
ctx := user.InjectOrgID(record.Ctx, record.TenantID) |
|
|
|
|
if _, err := kc.pusher.Push(ctx, &logproto.PushRequest{ |
|
|
|
|
recordCtx := user.InjectOrgID(record.Ctx, record.TenantID) |
|
|
|
|
req := &logproto.PushRequest{ |
|
|
|
|
Streams: []logproto.Stream{stream}, |
|
|
|
|
} |
|
|
|
|
if err := retryWithBackoff(ctx, func(attempts int) error { |
|
|
|
|
if _, err := kc.pusher.Push(recordCtx, req); err != nil { |
|
|
|
|
level.Warn(kc.logger).Log("msg", "failed to push records", "err", err, "offset", record.Offset, "attempts", attempts) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}); err != nil { |
|
|
|
|
level.Error(kc.logger).Log("msg", "failed to push records", "error", err) |
|
|
|
|
level.Error(kc.logger).Log("msg", "exhausted all retry attempts, failed to push records", "err", err, "offset", record.Offset) |
|
|
|
|
} |
|
|
|
|
kc.committer.EnqueueOffset(record.Offset) |
|
|
|
|
} |
|
|
|
|
kc.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) |
|
|
|
|
kc.metrics.currentOffset.Set(float64(maxOffset)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func canRetry(err error) bool { |
|
|
|
|
return errors.Is(err, ErrReadOnly) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func retryWithBackoff(ctx context.Context, fn func(attempts int) error) error { |
|
|
|
|
err := fn(0) |
|
|
|
|
if err == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if !canRetry(err) { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
backoff := backoff.New(ctx, backoff.Config{ |
|
|
|
|
MinBackoff: 100 * time.Millisecond, |
|
|
|
|
MaxBackoff: 5 * time.Second, |
|
|
|
|
MaxRetries: 0, // Retry infinitely
|
|
|
|
|
}) |
|
|
|
|
backoff.Wait() |
|
|
|
|
for backoff.Ongoing() { |
|
|
|
|
err = fn(backoff.NumRetries()) |
|
|
|
|
if err == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if !canRetry(err) { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
backoff.Wait() |
|
|
|
|
} |
|
|
|
|
return backoff.Err() |
|
|
|
|
} |
|
|
|
|