diff --git a/pkg/kafka/ingester/consumer.go b/pkg/kafka/ingester/consumer.go deleted file mode 100644 index 57abb2b00f..0000000000 --- a/pkg/kafka/ingester/consumer.go +++ /dev/null @@ -1,303 +0,0 @@ -package ingester - -import ( - "bytes" - "context" - "crypto/rand" - "fmt" - "io" - "math" - "sync" - "time" - - "github.com/dustin/go-humanize" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "google.golang.org/grpc" - - "github.com/grafana/dskit/backoff" - - "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" - "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/partition" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/storage/wal" -) - -// ObjectStorage defines an interface for object storage operations -type ObjectStorage interface { - PutObject(ctx context.Context, objectKey string, object io.Reader) error -} - -// MetadataStore defines an interface for metadata storage operations -type MetadataStore interface { - AddBlock(ctx context.Context, in *metastorepb.AddBlockRequest, opts ...grpc.CallOption) (*metastorepb.AddBlockResponse, error) -} - -// consumer represents a Kafka consumer that processes and stores log entries -type consumer struct { - metastoreClient MetadataStore - storage ObjectStorage - writer *wal.SegmentWriter - committer partition.Committer - flushInterval time.Duration - maxFlushSize int64 - lastOffset int64 - - flushBuf *bytes.Buffer - decoder *kafka.Decoder - toStore []*logproto.Entry - - metrics *consumerMetrics - logger log.Logger -} - -// NewConsumerFactory creates and initializes a new consumer instance -func NewConsumerFactory( - metastoreClient MetadataStore, - storage ObjectStorage, - flushInterval time.Duration, - maxFlushSize int64, - logger log.Logger, - reg prometheus.Registerer, -) partition.ConsumerFactory { - return func(committer partition.Committer) (partition.Consumer, error) { - writer, err := wal.NewWalSegmentWriter() - if err != nil { - return nil, err - } - decoder, err := kafka.NewDecoder() - if err != nil { - return nil, err - } - return &consumer{ - logger: logger, - metastoreClient: metastoreClient, - storage: storage, - writer: writer, - metrics: newConsumerMetrics(reg), - flushBuf: bytes.NewBuffer(make([]byte, 0, 10<<20)), // 10 MB - decoder: decoder, - committer: committer, - flushInterval: flushInterval, - maxFlushSize: maxFlushSize, - lastOffset: -1, - }, nil - } -} - -// Start starts the consumer and returns a function to wait for it to finish -// It consumes records from the recordsChan, and flushes them to storage periodically. -func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - flushTicker := time.NewTicker(c.flushInterval) - defer flushTicker.Stop() - for { - select { - case <-flushTicker.C: - level.Info(c.logger).Log("msg", "flushing block") - c.Flush() - case <-ctx.Done(): - level.Info(c.logger).Log("msg", "shutting down consumer") - c.Flush() - return - case records := <-recordsChan: - if err := c.consume(records); err != nil { - level.Error(c.logger).Log("msg", "failed to consume records", "error", err) - return - } - if c.writer.InputSize() > c.maxFlushSize { - level.Info(c.logger).Log("msg", "flushing block due to size limit", "size", humanize.Bytes(uint64(c.writer.InputSize()))) - c.Flush() - } - } - } - }() - return wg.Wait -} - -// consume processes a batch of Kafka records, decoding and storing them -func (c *consumer) consume(records []partition.Record) error { - if len(records) == 0 { - return nil - } - var ( - minOffset = int64(math.MaxInt64) - maxOffset = int64(0) - ) - for _, record := range records { - minOffset = min(minOffset, record.Offset) - maxOffset = max(maxOffset, record.Offset) - } - level.Debug(c.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset) - return c.retryWithBackoff(context.Background(), backoff.Config{ - MinBackoff: 250 * time.Millisecond, - MaxBackoff: 2 * time.Second, - MaxRetries: 0, // retry forever - }, func(boff *backoff.Backoff) error { - consumeStart := time.Now() - if err := c.appendRecords(records); err != nil { - level.Error(c.logger).Log( - "msg", "encountered error while ingesting data from Kafka; should retry", - "err", err, - "record_min_offset", minOffset, - "record_max_offset", maxOffset, - "num_retries", boff.NumRetries(), - ) - return err - } - c.lastOffset = maxOffset - c.metrics.currentOffset.Set(float64(c.lastOffset)) - c.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds()) - return nil - }) -} - -func (c *consumer) appendRecords(records []partition.Record) error { - for _, record := range records { - stream, labels, err := c.decoder.Decode(record.Content) - if err != nil { - return fmt.Errorf("failed to decode record: %w", err) - } - if len(stream.Entries) == 0 { - continue - } - if len(c.toStore) == 0 { - c.toStore = make([]*logproto.Entry, 0, len(stream.Entries)) - } - c.toStore = c.toStore[:0] - for _, entry := range stream.Entries { - c.toStore = append(c.toStore, &logproto.Entry{ - Timestamp: entry.Timestamp, - Line: entry.Line, - StructuredMetadata: entry.StructuredMetadata, - Parsed: entry.Parsed, - }) - } - c.writer.Append(record.TenantID, stream.Labels, labels, c.toStore, time.Now()) - } - return nil -} - -// Flush writes the accumulated data to storage and updates the metadata store -func (c *consumer) Flush() { - if c.writer.InputSize() == 0 { - return - } - if c.lastOffset == -1 { - return - } - if err := c.retryWithBackoff(context.Background(), backoff.Config{ - MinBackoff: 250 * time.Millisecond, - MaxBackoff: 10 * time.Second, - MaxRetries: 0, // retry forever - }, func(boff *backoff.Backoff) error { - start := time.Now() - c.metrics.flushesTotal.Add(1) - defer func() { c.metrics.flushDuration.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - if err := c.flush(ctx); err != nil { - c.metrics.flushFailuresTotal.Inc() - level.Error(c.logger).Log( - "msg", "failed to flush block", - "error", err, - "num_retries", boff.NumRetries(), - ) - return err - } - c.lastOffset = -1 - return nil - }); err != nil { - level.Error(c.logger).Log("msg", "failed to flush block", "error", err) - } -} - -func (c *consumer) retryWithBackoff(ctx context.Context, cfg backoff.Config, fn func(boff *backoff.Backoff) error) error { - boff := backoff.New(ctx, cfg) - var err error - for boff.Ongoing() { - err = fn(boff) - if err == nil { - return nil - } - boff.Wait() - } - if err != nil { - return err - } - return boff.ErrCause() -} - -func (c *consumer) flush(ctx context.Context) error { - defer c.flushBuf.Reset() - if _, err := c.writer.WriteTo(c.flushBuf); err != nil { - return err - } - - stats := wal.GetSegmentStats(c.writer, time.Now()) - wal.ReportSegmentStats(stats, c.metrics.segmentMetrics) - - id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() - if err := c.storage.PutObject(ctx, wal.Dir+id, c.flushBuf); err != nil { - return fmt.Errorf("failed to put object to object storage: %w", err) - } - - if _, err := c.metastoreClient.AddBlock(ctx, &metastorepb.AddBlockRequest{ - Block: c.writer.Meta(id), - }); err != nil { - return fmt.Errorf("failed to add block to metastore: %w", err) - } - c.writer.Reset() - if err := c.committer.Commit(ctx, c.lastOffset); err != nil { - return fmt.Errorf("failed to commit offset: %w", err) - } - - return nil -} - -// consumerMetrics holds various Prometheus metrics for monitoring consumer operations -type consumerMetrics struct { - flushesTotal prometheus.Counter - flushFailuresTotal prometheus.Counter - flushDuration prometheus.Histogram - segmentMetrics *wal.SegmentMetrics - consumeLatency prometheus.Histogram - currentOffset prometheus.Gauge -} - -// newConsumerMetrics initializes and returns a new consumerMetrics instance -func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics { - return &consumerMetrics{ - flushesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "loki_kafka_ingester_flushes_total", - Help: "The total number of flushes.", - }), - flushFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "loki_kafka_ingester_flush_failures_total", - Help: "The total number of failed flushes.", - }), - flushDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_kafka_ingester_flush_duration_seconds", - Help: "The flush duration (in seconds).", - Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), - NativeHistogramBucketFactor: 1.1, - }), - consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "loki_ingest_storage_reader_records_batch_process_duration_seconds", - Help: "How long a consumer spent processing a batch of records from Kafka.", - NativeHistogramBucketFactor: 1.1, - }), - segmentMetrics: wal.NewSegmentMetrics(reg), - currentOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "loki_kafka_ingester_current_offset", - Help: "The current offset of the Kafka consumer.", - }), - } -} diff --git a/pkg/kafka/ingester/consumer_test.go b/pkg/kafka/ingester/consumer_test.go deleted file mode 100644 index a0baa92ba8..0000000000 --- a/pkg/kafka/ingester/consumer_test.go +++ /dev/null @@ -1,198 +0,0 @@ -package ingester - -import ( - "context" - "os" - "strings" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" - "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" - "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/partition" - "github.com/grafana/loki/v3/pkg/logproto" -) - -type mockCommitter struct { - committed int64 -} - -func newMockCommitter() *mockCommitter { - return &mockCommitter{ - committed: -1, - } -} - -func (m *mockCommitter) Commit(_ context.Context, offset int64) error { - m.committed = offset - return nil -} - -func (m *mockCommitter) EnqueueOffset(offset int64) { - // For testing purposes, we'll just set the committed offset directly - m.committed = offset -} - -func TestConsumer_PeriodicFlush(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - storage, err := objstore.NewTestStorage(t) - require.NoError(t, err) - - metastore := NewTestMetastore() - reg := prometheus.NewRegistry() - - flushInterval := 100 * time.Millisecond - maxFlushSize := int64(1000) - - committer := newMockCommitter() - consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) - consumer, err := consumerFactory(committer) - require.NoError(t, err) - - recordsChan := make(chan []partition.Record) - _ = consumer.Start(ctx, recordsChan) - - stream := logproto.Stream{ - Labels: `{__name__="test_metric", label="value1"}`, - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 1000), Line: "10.5"}, - }, - } - - encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) - require.NoError(t, err) - - records := []partition.Record{{ - TenantID: "tenant1", - Content: encodedRecords[0].Value, - Offset: 0, - }} - - recordsChan <- records - - require.Eventually(t, func() bool { - blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{ - TenantId: "tenant1", - StartTime: 0, - EndTime: 100000, - }) - require.NoError(t, err) - return len(blocks.Blocks) == 1 - }, 5*time.Second, 100*time.Millisecond) - - // Verify committed offset - require.Equal(t, int64(0), committer.committed) -} - -func TestConsumer_ShutdownFlush(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - storage, err := objstore.NewTestStorage(t) - require.NoError(t, err) - - metastore := NewTestMetastore() - reg := prometheus.NewRegistry() - - flushInterval := 1 * time.Hour - maxFlushSize := int64(1000) - - committer := newMockCommitter() - consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) - consumer, err := consumerFactory(committer) - require.NoError(t, err) - - recordsChan := make(chan []partition.Record) - wait := consumer.Start(ctx, recordsChan) - - stream := logproto.Stream{ - Labels: `{__name__="test_metric", label="value1"}`, - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 1000), Line: "10.5"}, - }, - } - - encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) - require.NoError(t, err) - - records := []partition.Record{{ - TenantID: "tenant1", - Content: encodedRecords[0].Value, - Offset: 0, - }} - - recordsChan <- records - - cancel() - wait() - - blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{ - TenantId: "tenant1", - StartTime: 0, - EndTime: 100000, - }) - require.NoError(t, err) - require.Equal(t, 1, len(blocks.Blocks)) - - // Verify committed offset - require.Equal(t, int64(0), committer.committed) -} - -func TestConsumer_MaxFlushSize(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - storage, err := objstore.NewTestStorage(t) - require.NoError(t, err) - - metastore := NewTestMetastore() - reg := prometheus.NewRegistry() - - flushInterval := 1 * time.Hour - maxFlushSize := int64(10) - - committer := newMockCommitter() - consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg) - consumer, err := consumerFactory(committer) - require.NoError(t, err) - - recordsChan := make(chan []partition.Record) - _ = consumer.Start(ctx, recordsChan) - - stream := logproto.Stream{ - Labels: `{__name__="test_metric", label="value1"}`, - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 1000), Line: strings.Repeat("a", 100)}, - }, - } - - encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20) - require.NoError(t, err) - - records := []partition.Record{{ - TenantID: "tenant1", - Content: encodedRecords[0].Value, - Offset: 0, - }} - - recordsChan <- records - - require.Eventually(t, func() bool { - blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{ - TenantId: "tenant1", - StartTime: 0, - EndTime: 100000, - }) - require.NoError(t, err) - return len(blocks.Blocks) == 1 - }, 5*time.Second, 100*time.Millisecond) - - require.Equal(t, int64(0), committer.committed) -} diff --git a/pkg/kafka/ingester/ingester.go b/pkg/kafka/ingester/ingester.go deleted file mode 100644 index 39595df142..0000000000 --- a/pkg/kafka/ingester/ingester.go +++ /dev/null @@ -1,383 +0,0 @@ -package ingester - -import ( - "context" - "errors" - "flag" - "fmt" - "net/http" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/services" - "github.com/prometheus/client_golang/prometheus" - "google.golang.org/grpc/health/grpc_health_v1" - - "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/ingester/shutdownmarker" - "github.com/grafana/loki/v3/pkg/kafka/partition" - "github.com/grafana/loki/v3/pkg/kafka/partitionring" - util_log "github.com/grafana/loki/v3/pkg/util/log" - - "github.com/grafana/loki/v3/pkg/util" -) - -const ( - RingName = "kafka-ingester" - PartitionRingName = "kafka-partition" -) - -var ( - defaultFlushInterval = 15 * time.Second - defaultFlushSize int64 = 300 << 20 // 300 MB -) - -// Config for an ingester. -type Config struct { - Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."` - LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` - ShutdownMarkerPath string `yaml:"shutdown_marker_path"` - FlushInterval time.Duration `yaml:"flush_interval" doc:"description=The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used."` - FlushSize int64 `yaml:"flush_size" doc:"description=The size at which the ingester will flush and commit offsets to Kafka. If not set, the default flush size will be used."` - PartitionRingConfig partitionring.Config `yaml:"partition_ring" category:"experimental"` - KafkaConfig kafka.Config `yaml:"-"` -} - -// RegisterFlags registers the flags. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.LifecyclerConfig.RegisterFlagsWithPrefix("kafka-ingester.", f, util_log.Logger) - cfg.PartitionRingConfig.RegisterFlagsWithPrefix("kafka-ingester.", f) - f.StringVar(&cfg.ShutdownMarkerPath, "kafka-ingester.shutdown-marker-path", "", "Path where the shutdown marker file is stored. If not set and common.path_prefix is set then common.path_prefix will be used.") - f.BoolVar(&cfg.Enabled, "kafka-ingester.enabled", false, "Whether the Kafka-based ingester path is enabled") - f.DurationVar(&cfg.FlushInterval, "kafka-ingester.flush-interval", defaultFlushInterval, "The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used.") - f.Int64Var(&cfg.FlushSize, "kafka-ingester.flush-size", defaultFlushSize, "The size at which the ingester will flush and commit offsets to Kafka. If not set, the default flush size will be used.") -} - -func (cfg *Config) Validate() error { - if !cfg.Enabled { - return nil - } - if cfg.FlushInterval <= 0 { - return errors.New("kafka-ingester.flush-interval must be greater than 0") - } - if cfg.LifecyclerConfig.RingConfig.ReplicationFactor != 1 { - cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1 - level.Warn(util_log.Logger).Log("msg", "kafka-ingester.lifecycler.replication-factor has been set to 1. This is the only supported replication factor for the kafka-ingester.") - } - return nil -} - -type Wrapper interface { - Wrap(wrapped Interface) Interface -} - -// Interface is an interface for the Ingester -type Interface interface { - services.Service - http.Handler - CheckReady(ctx context.Context) error - FlushHandler(w http.ResponseWriter, _ *http.Request) -} - -// Ingester builds chunks for incoming log streams. -type Ingester struct { - services.Service - - cfg Config - logger log.Logger - - metrics *ingesterMetrics - - lifecycler *ring.Lifecycler - lifecyclerWatcher *services.FailureWatcher - ingesterPartitionID int32 - partitionRingLifecycler *ring.PartitionInstanceLifecycler - partitionReader *partition.Reader -} - -// New makes a new Ingester. -func New(cfg Config, - consumerFactory partition.ConsumerFactory, - logger log.Logger, - metricsNamespace string, - registerer prometheus.Registerer, -) (*Ingester, error) { - metrics := newIngesterMetrics(registerer) - - ingesterPartitionID, err := partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID) - if err != nil { - return nil, fmt.Errorf("calculating ingester partition ID: %w", err) - } - - partitionRingKV := cfg.PartitionRingConfig.KVStore.Mock - if partitionRingKV == nil { - partitionRingKV, err = kv.NewClient(cfg.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger) - if err != nil { - return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err) - } - } - - partitionRingLifecycler := ring.NewPartitionInstanceLifecycler( - cfg.PartitionRingConfig.ToLifecyclerConfig(ingesterPartitionID, cfg.LifecyclerConfig.ID), - PartitionRingName, - PartitionRingName+"-key", - partitionRingKV, - logger, - prometheus.WrapRegistererWithPrefix("loki_", registerer)) - i := &Ingester{ - cfg: cfg, - logger: logger, - ingesterPartitionID: ingesterPartitionID, - partitionRingLifecycler: partitionRingLifecycler, - metrics: metrics, - } - - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, RingName, RingName+"-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)) - if err != nil { - return nil, err - } - i.partitionReader, err = partition.NewReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer) - if err != nil { - return nil, err - } - - i.lifecyclerWatcher = services.NewFailureWatcher() - i.lifecyclerWatcher.WatchService(i.lifecycler) - i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler) - i.lifecyclerWatcher.WatchService(i.partitionReader) - - i.Service = services.NewBasicService(i.starting, i.running, i.stopping) - - return i, nil -} - -// ServeHTTP implements the pattern ring status page. -func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) { - i.lifecycler.ServeHTTP(w, r) -} - -func (i *Ingester) starting(ctx context.Context) (err error) { - defer func() { - if err != nil { - // if starting() fails for any reason (e.g., context canceled), - // the lifecycler must be stopped. - _ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler) - } - }() - - // First of all we have to check if the shutdown marker is set. This needs to be done - // as first thing because, if found, it may change the behaviour of the ingester startup. - if exists, err := shutdownmarker.Exists(shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath)); err != nil { - return fmt.Errorf("failed to check ingester shutdown marker: %w", err) - } else if exists { - level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath)) - i.setPrepareShutdown() - } - - // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done - err = i.lifecycler.StartAsync(context.Background()) - if err != nil { - return err - } - - err = i.lifecycler.AwaitRunning(ctx) - if err != nil { - return err - } - - err = i.partitionRingLifecycler.StartAsync(context.Background()) - if err != nil { - return err - } - err = i.partitionRingLifecycler.AwaitRunning(ctx) - if err != nil { - return err - } - err = i.partitionReader.StartAsync(context.Background()) - if err != nil { - return err - } - err = i.partitionReader.AwaitRunning(ctx) - if err != nil { - return err - } - - return nil -} - -func (i *Ingester) running(ctx context.Context) error { - var serviceError error - select { - // wait until service is asked to stop - case <-ctx.Done(): - // stop - case err := <-i.lifecyclerWatcher.Chan(): - serviceError = fmt.Errorf("lifecycler failed: %w", err) - } - - return serviceError -} - -// stopping is called when Ingester transitions to Stopping state. -// -// At this point, loop no longer runs, but flushers are still running. -func (i *Ingester) stopping(_ error) error { - var errs util.MultiError - - errs.Add(services.StopAndAwaitTerminated(context.Background(), i.partitionReader)) - errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) - errs.Add(services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler)) - // Remove the shutdown marker if it exists since we are shutting down - shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath) - exist, err := shutdownmarker.Exists(shutdownMarkerPath) - if err != nil { - level.Warn(i.logger).Log("msg", "failed to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) - } else if exist { - if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil { - level.Warn(i.logger).Log("msg", "failed to remove shutdown marker", "path", shutdownMarkerPath, "err", err) - } - } - return errs.Err() -} - -// Watch implements grpc_health_v1.HealthCheck. -func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error { - return nil -} - -func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) { - logger := log.With(i.logger, "partition", i.ingesterPartitionID) - // Don't allow callers to change the shutdown configuration while we're in the middle - // of starting or shutting down. - if i.State() != services.Running { - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath) - exists, err := shutdownmarker.Exists(shutdownMarkerPath) - if err != nil { - level.Error(i.logger).Log("msg", "unable to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - switch r.Method { - case http.MethodPost: - // It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale - // will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we - // don't know to which state reverting back. Given a partition is expected to stay in PENDING state - // for a short period, we simply don't allow this case. - state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) - if err != nil { - level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if state == ring.PartitionPending { - level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state") - w.WriteHeader(http.StatusConflict) - return - } - - if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil { - level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if !exists { - if err := shutdownmarker.Create(shutdownMarkerPath); err != nil { - level.Error(i.logger).Log("msg", "unable to create prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - i.setPrepareShutdown() - - case http.MethodDelete: - state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) - if err != nil { - level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // If partition is inactive, make it active. We ignore other states Active and especially Pending. - if state == ring.PartitionInactive { - - // We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency - // in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried. - // Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design. - // We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than - // "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer - // than "lookback period" ago, it looks to be an edge case not worth to address. - if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil { - level.Error(logger).Log("msg", "failed to change partition state to active", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if exists { - if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil { - level.Error(i.logger).Log("msg", "unable to remove prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - } - i.unsetPrepareShutdown() - } - } - - state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context()) - if err != nil { - level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - if state == ring.PartitionInactive { - util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()}) - } else { - util.WriteJSONResponse(w, map[string]any{"timestamp": 0}) - } -} - -// setPrepareShutdown toggles ingester lifecycler config to prepare for shutdown -func (i *Ingester) setPrepareShutdown() { - i.lifecycler.SetUnregisterOnShutdown(true) - i.lifecycler.SetFlushOnShutdown(true) - i.partitionRingLifecycler.SetCreatePartitionOnStartup(false) - i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true) - i.metrics.shutdownMarker.Set(1) -} - -func (i *Ingester) unsetPrepareShutdown() { - i.lifecycler.SetUnregisterOnShutdown(i.cfg.LifecyclerConfig.UnregisterOnShutdown) - i.lifecycler.SetFlushOnShutdown(true) - i.partitionRingLifecycler.SetCreatePartitionOnStartup(true) - i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(false) - i.metrics.shutdownMarker.Set(0) -} - -// ReadinessHandler is used to indicate to k8s when the ingesters are ready for -// the addition removal of another ingester. Returns 204 when the ingester is -// ready, 500 otherwise. -func (i *Ingester) CheckReady(ctx context.Context) error { - // todo. - if s := i.State(); s != services.Running && s != services.Stopping { - return fmt.Errorf("ingester not ready: %v", s) - } - return i.lifecycler.CheckReady(ctx) -} - -// Flush implements ring.FlushTransferer -// Flush triggers a flush of all the chunks and closes the flush queues. -// Called from the Lifecycler as part of the ingester shutdown. -func (i *Ingester) Flush() { -} - -func (i *Ingester) TransferOut(_ context.Context) error { - return nil -} diff --git a/pkg/kafka/ingester/ingester_test.go b/pkg/kafka/ingester/ingester_test.go deleted file mode 100644 index c7d62b9593..0000000000 --- a/pkg/kafka/ingester/ingester_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package ingester - -import ( - "context" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/consul" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/services" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - - "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" - "github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" - "github.com/grafana/loki/v3/pkg/util/test" -) - -func TestPreparePartitionDownscaleHandler(t *testing.T) { - cfg := defaultIngesterTestConfig(t) - // start ingester. - storage, err := objstore.NewTestStorage(t) - require.NoError(t, err) - ing, err := New(cfg, - NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, log.NewNopLogger(), prometheus.NewRegistry()), - log.NewNopLogger(), "test", prometheus.NewRegistry()) - require.NoError(t, err) - err = services.StartAndAwaitRunning(context.Background(), ing) - require.NoError(t, err) - - t.Run("get state", func(t *testing.T) { - w := httptest.NewRecorder() - ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("GET", "/", nil)) - require.Equal(t, http.StatusOK, w.Code) - require.Equal(t, "{\"timestamp\":0}", w.Body.String()) - }) - t.Run("prepare shutdown pending", func(t *testing.T) { - w := httptest.NewRecorder() - ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("POST", "/", nil)) - require.Equal(t, http.StatusConflict, w.Code) - }) - t.Run("prepare shutdown and cancel", func(t *testing.T) { - w := httptest.NewRecorder() - test.Poll(t, 5*time.Second, ring.PartitionActive, func() interface{} { - return getState(t, cfg) - }) - ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("POST", "/", nil)) - require.Equal(t, http.StatusOK, w.Code) - test.Poll(t, 5*time.Second, ring.PartitionInactive, func() interface{} { - return getState(t, cfg) - }) - w2 := httptest.NewRecorder() - ing.PreparePartitionDownscaleHandler(w2, httptest.NewRequest("DELETE", "/", nil)) - require.Equal(t, http.StatusOK, w.Code) - test.Poll(t, 5*time.Second, ring.PartitionActive, func() interface{} { - return getState(t, cfg) - }) - }) - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) -} - -func getState(t *testing.T, cfg Config) ring.PartitionState { - get, err := cfg.PartitionRingConfig.KVStore.Mock.Get(context.Background(), PartitionRingName+"-key") - require.NoError(t, err) - - ringDesc := ring.GetOrCreatePartitionRingDesc(get) - return ringDesc.Partitions[0].State -} - -// nolint -func defaultIngesterTestConfig(t testing.TB) Config { - kvRing, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { require.NoError(t, closer.Close()) }) - - kvPartitionRing, closerPartitionRing := consul.NewInMemoryClient(ring.GetPartitionRingCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { require.NoError(t, closerPartitionRing.Close()) }) - - cfg := Config{} - flagext.DefaultValues(&cfg) - - cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvRing - cfg.PartitionRingConfig.KVStore.Mock = kvPartitionRing - cfg.PartitionRingConfig.MinOwnersCount = 1 - cfg.PartitionRingConfig.MinOwnersDuration = 0 - cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1 - cfg.LifecyclerConfig.NumTokens = 1 - cfg.LifecyclerConfig.ListenPort = 0 - cfg.LifecyclerConfig.Addr = "localhost" - cfg.LifecyclerConfig.ID = "localhost" - cfg.LifecyclerConfig.FinalSleep = 0 - cfg.LifecyclerConfig.MinReadyDuration = 0 - - return cfg -} - -// TestMetastore is a simple in-memory metastore for testing -type TestMetastore struct { - blocks map[string][]*metastorepb.BlockMeta -} - -func NewTestMetastore() *TestMetastore { - return &TestMetastore{blocks: make(map[string][]*metastorepb.BlockMeta)} -} - -func (m *TestMetastore) ListBlocksForQuery(_ context.Context, req *metastorepb.ListBlocksForQueryRequest, _ ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) { - blocks := m.blocks[req.TenantId] - var result []*metastorepb.BlockMeta - for _, block := range blocks { - if block.MinTime <= req.EndTime && block.MaxTime >= req.StartTime { - result = append(result, block) - } - } - return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil -} - -func (m *TestMetastore) AddBlock(_ context.Context, in *metastorepb.AddBlockRequest, _ ...grpc.CallOption) (*metastorepb.AddBlockResponse, error) { - for _, stream := range in.Block.TenantStreams { - m.blocks[stream.TenantId] = append(m.blocks[stream.TenantId], in.Block) - } - return &metastorepb.AddBlockResponse{}, nil -} diff --git a/pkg/kafka/ingester/metrics.go b/pkg/kafka/ingester/metrics.go deleted file mode 100644 index e73ee08095..0000000000 --- a/pkg/kafka/ingester/metrics.go +++ /dev/null @@ -1,20 +0,0 @@ -package ingester - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -type ingesterMetrics struct { - // Shutdown marker for ingester scale down - shutdownMarker prometheus.Gauge -} - -func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { - return &ingesterMetrics{ - shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Name: "loki_ingester_prepare_shutdown_requested", - Help: "1 if the ingester has been requested to prepare for shutdown via endpoint or marker file.", - }), - } -} diff --git a/pkg/kafka/ingester/shutdownmarker/shutdown_marker.go b/pkg/kafka/ingester/shutdownmarker/shutdown_marker.go deleted file mode 100644 index 7d1a4ec2f3..0000000000 --- a/pkg/kafka/ingester/shutdownmarker/shutdown_marker.go +++ /dev/null @@ -1,60 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package shutdownmarker - -import ( - "os" - "path" - "strings" - "time" - - "github.com/grafana/dskit/multierror" - - "github.com/grafana/loki/v3/pkg/util/atomicfs" -) - -const shutdownMarkerFilename = "shutdown-requested.txt" - -// Create writes a marker file on the given path to indicate that a component is -// going to be scaled down in the future. The presence of this file means that a component -// should perform some operations specified by the component itself before being shutdown. -func Create(p string) error { - return atomicfs.CreateFile(p, strings.NewReader(time.Now().UTC().Format(time.RFC3339))) -} - -// Remove removes the shutdown marker file on the given path if it exists. -func Remove(p string) error { - err := os.Remove(p) - if err != nil && !os.IsNotExist(err) { - return err - } - - dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777) - if err != nil { - return err - } - - merr := multierror.New() - merr.Add(dir.Sync()) - merr.Add(dir.Close()) - return merr.Err() -} - -// Exists returns true if the shutdown marker file exists on the given path, false otherwise -func Exists(p string) (bool, error) { - s, err := os.Stat(p) - if err != nil && os.IsNotExist(err) { - return false, nil - } - - if err != nil { - return false, err - } - - return s.Mode().IsRegular(), nil -} - -// GetPath returns the absolute path of the shutdown marker file -func GetPath(dirPath string) string { - return path.Join(dirPath, shutdownMarkerFilename) -} diff --git a/pkg/kafka/ingester/shutdownmarker/shutdown_marker_test.go b/pkg/kafka/ingester/shutdownmarker/shutdown_marker_test.go deleted file mode 100644 index c8e0b851be..0000000000 --- a/pkg/kafka/ingester/shutdownmarker/shutdown_marker_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package shutdownmarker - -import ( - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestShutdownMarker_GetPath(t *testing.T) { - dir := "/a/b/c" - expectedPath := filepath.Join(dir, shutdownMarkerFilename) - require.Equal(t, expectedPath, GetPath(dir)) -} - -func TestShutdownMarker_Create(t *testing.T) { - dir := t.TempDir() - shutdownMarkerPath := GetPath(dir) - exists, err := Exists(shutdownMarkerPath) - require.NoError(t, err) - require.False(t, exists) - - err = Create(shutdownMarkerPath) - require.NoError(t, err) - - exists, err = Exists(shutdownMarkerPath) - require.NoError(t, err) - require.True(t, exists) -} - -func TestShutdownMarker_Remove(t *testing.T) { - dir := t.TempDir() - shutdownMarkerPath := GetPath(dir) - exists, err := Exists(shutdownMarkerPath) - require.NoError(t, err) - require.False(t, exists) - - require.Nil(t, Create(shutdownMarkerPath)) - exists, err = Exists(shutdownMarkerPath) - require.NoError(t, err) - require.True(t, exists) - - require.Nil(t, Remove(shutdownMarkerPath)) - exists, err = Exists(shutdownMarkerPath) - require.NoError(t, err) - require.False(t, exists) -} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 1418f01e21..faf15069e0 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -43,7 +43,6 @@ import ( metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client" ingester_client "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/kafka" - ingester_kafka "github.com/grafana/loki/v3/pkg/kafka/ingester" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/loki/common" "github.com/grafana/loki/v3/pkg/lokifrontend" @@ -380,7 +379,6 @@ type Loki struct { MetastoreClient *metastoreclient.Client partitionRingWatcher *ring.PartitionRingWatcher partitionRing *ring.PartitionInstanceRing - kafkaIngester *ingester_kafka.Ingester ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics