chore: delete old kafka ingester code (#14819)

pull/14822/head
George Robinson 1 year ago committed by GitHub
parent 1bce67127f
commit 869f7b5a41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 303
      pkg/kafka/ingester/consumer.go
  2. 198
      pkg/kafka/ingester/consumer_test.go
  3. 383
      pkg/kafka/ingester/ingester.go
  4. 126
      pkg/kafka/ingester/ingester_test.go
  5. 20
      pkg/kafka/ingester/metrics.go
  6. 60
      pkg/kafka/ingester/shutdownmarker/shutdown_marker.go
  7. 49
      pkg/kafka/ingester/shutdownmarker/shutdown_marker_test.go
  8. 2
      pkg/loki/loki.go

@ -1,303 +0,0 @@
package ingester
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"math"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
// ObjectStorage defines an interface for object storage operations
type ObjectStorage interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
}
// MetadataStore defines an interface for metadata storage operations
type MetadataStore interface {
AddBlock(ctx context.Context, in *metastorepb.AddBlockRequest, opts ...grpc.CallOption) (*metastorepb.AddBlockResponse, error)
}
// consumer represents a Kafka consumer that processes and stores log entries
type consumer struct {
metastoreClient MetadataStore
storage ObjectStorage
writer *wal.SegmentWriter
committer partition.Committer
flushInterval time.Duration
maxFlushSize int64
lastOffset int64
flushBuf *bytes.Buffer
decoder *kafka.Decoder
toStore []*logproto.Entry
metrics *consumerMetrics
logger log.Logger
}
// NewConsumerFactory creates and initializes a new consumer instance
func NewConsumerFactory(
metastoreClient MetadataStore,
storage ObjectStorage,
flushInterval time.Duration,
maxFlushSize int64,
logger log.Logger,
reg prometheus.Registerer,
) partition.ConsumerFactory {
return func(committer partition.Committer) (partition.Consumer, error) {
writer, err := wal.NewWalSegmentWriter()
if err != nil {
return nil, err
}
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &consumer{
logger: logger,
metastoreClient: metastoreClient,
storage: storage,
writer: writer,
metrics: newConsumerMetrics(reg),
flushBuf: bytes.NewBuffer(make([]byte, 0, 10<<20)), // 10 MB
decoder: decoder,
committer: committer,
flushInterval: flushInterval,
maxFlushSize: maxFlushSize,
lastOffset: -1,
}, nil
}
}
// Start starts the consumer and returns a function to wait for it to finish
// It consumes records from the recordsChan, and flushes them to storage periodically.
func (c *consumer) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
flushTicker := time.NewTicker(c.flushInterval)
defer flushTicker.Stop()
for {
select {
case <-flushTicker.C:
level.Info(c.logger).Log("msg", "flushing block")
c.Flush()
case <-ctx.Done():
level.Info(c.logger).Log("msg", "shutting down consumer")
c.Flush()
return
case records := <-recordsChan:
if err := c.consume(records); err != nil {
level.Error(c.logger).Log("msg", "failed to consume records", "error", err)
return
}
if c.writer.InputSize() > c.maxFlushSize {
level.Info(c.logger).Log("msg", "flushing block due to size limit", "size", humanize.Bytes(uint64(c.writer.InputSize())))
c.Flush()
}
}
}
}()
return wg.Wait
}
// consume processes a batch of Kafka records, decoding and storing them
func (c *consumer) consume(records []partition.Record) error {
if len(records) == 0 {
return nil
}
var (
minOffset = int64(math.MaxInt64)
maxOffset = int64(0)
)
for _, record := range records {
minOffset = min(minOffset, record.Offset)
maxOffset = max(maxOffset, record.Offset)
}
level.Debug(c.logger).Log("msg", "consuming records", "min_offset", minOffset, "max_offset", maxOffset)
return c.retryWithBackoff(context.Background(), backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
MaxRetries: 0, // retry forever
}, func(boff *backoff.Backoff) error {
consumeStart := time.Now()
if err := c.appendRecords(records); err != nil {
level.Error(c.logger).Log(
"msg", "encountered error while ingesting data from Kafka; should retry",
"err", err,
"record_min_offset", minOffset,
"record_max_offset", maxOffset,
"num_retries", boff.NumRetries(),
)
return err
}
c.lastOffset = maxOffset
c.metrics.currentOffset.Set(float64(c.lastOffset))
c.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
return nil
})
}
func (c *consumer) appendRecords(records []partition.Record) error {
for _, record := range records {
stream, labels, err := c.decoder.Decode(record.Content)
if err != nil {
return fmt.Errorf("failed to decode record: %w", err)
}
if len(stream.Entries) == 0 {
continue
}
if len(c.toStore) == 0 {
c.toStore = make([]*logproto.Entry, 0, len(stream.Entries))
}
c.toStore = c.toStore[:0]
for _, entry := range stream.Entries {
c.toStore = append(c.toStore, &logproto.Entry{
Timestamp: entry.Timestamp,
Line: entry.Line,
StructuredMetadata: entry.StructuredMetadata,
Parsed: entry.Parsed,
})
}
c.writer.Append(record.TenantID, stream.Labels, labels, c.toStore, time.Now())
}
return nil
}
// Flush writes the accumulated data to storage and updates the metadata store
func (c *consumer) Flush() {
if c.writer.InputSize() == 0 {
return
}
if c.lastOffset == -1 {
return
}
if err := c.retryWithBackoff(context.Background(), backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 0, // retry forever
}, func(boff *backoff.Backoff) error {
start := time.Now()
c.metrics.flushesTotal.Add(1)
defer func() { c.metrics.flushDuration.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if err := c.flush(ctx); err != nil {
c.metrics.flushFailuresTotal.Inc()
level.Error(c.logger).Log(
"msg", "failed to flush block",
"error", err,
"num_retries", boff.NumRetries(),
)
return err
}
c.lastOffset = -1
return nil
}); err != nil {
level.Error(c.logger).Log("msg", "failed to flush block", "error", err)
}
}
func (c *consumer) retryWithBackoff(ctx context.Context, cfg backoff.Config, fn func(boff *backoff.Backoff) error) error {
boff := backoff.New(ctx, cfg)
var err error
for boff.Ongoing() {
err = fn(boff)
if err == nil {
return nil
}
boff.Wait()
}
if err != nil {
return err
}
return boff.ErrCause()
}
func (c *consumer) flush(ctx context.Context) error {
defer c.flushBuf.Reset()
if _, err := c.writer.WriteTo(c.flushBuf); err != nil {
return err
}
stats := wal.GetSegmentStats(c.writer, time.Now())
wal.ReportSegmentStats(stats, c.metrics.segmentMetrics)
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := c.storage.PutObject(ctx, wal.Dir+id, c.flushBuf); err != nil {
return fmt.Errorf("failed to put object to object storage: %w", err)
}
if _, err := c.metastoreClient.AddBlock(ctx, &metastorepb.AddBlockRequest{
Block: c.writer.Meta(id),
}); err != nil {
return fmt.Errorf("failed to add block to metastore: %w", err)
}
c.writer.Reset()
if err := c.committer.Commit(ctx, c.lastOffset); err != nil {
return fmt.Errorf("failed to commit offset: %w", err)
}
return nil
}
// consumerMetrics holds various Prometheus metrics for monitoring consumer operations
type consumerMetrics struct {
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushDuration prometheus.Histogram
segmentMetrics *wal.SegmentMetrics
consumeLatency prometheus.Histogram
currentOffset prometheus.Gauge
}
// newConsumerMetrics initializes and returns a new consumerMetrics instance
func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics {
return &consumerMetrics{
flushesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_kafka_ingester_flushes_total",
Help: "The total number of flushes.",
}),
flushFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_kafka_ingester_flush_failures_total",
Help: "The total number of failed flushes.",
}),
flushDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_kafka_ingester_flush_duration_seconds",
Help: "The flush duration (in seconds).",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_records_batch_process_duration_seconds",
Help: "How long a consumer spent processing a batch of records from Kafka.",
NativeHistogramBucketFactor: 1.1,
}),
segmentMetrics: wal.NewSegmentMetrics(reg),
currentOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "loki_kafka_ingester_current_offset",
Help: "The current offset of the Kafka consumer.",
}),
}
}

@ -1,198 +0,0 @@
package ingester
import (
"context"
"os"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
)
type mockCommitter struct {
committed int64
}
func newMockCommitter() *mockCommitter {
return &mockCommitter{
committed: -1,
}
}
func (m *mockCommitter) Commit(_ context.Context, offset int64) error {
m.committed = offset
return nil
}
func (m *mockCommitter) EnqueueOffset(offset int64) {
// For testing purposes, we'll just set the committed offset directly
m.committed = offset
}
func TestConsumer_PeriodicFlush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
storage, err := objstore.NewTestStorage(t)
require.NoError(t, err)
metastore := NewTestMetastore()
reg := prometheus.NewRegistry()
flushInterval := 100 * time.Millisecond
maxFlushSize := int64(1000)
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
recordsChan := make(chan []partition.Record)
_ = consumer.Start(ctx, recordsChan)
stream := logproto.Stream{
Labels: `{__name__="test_metric", label="value1"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1000), Line: "10.5"},
},
}
encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20)
require.NoError(t, err)
records := []partition.Record{{
TenantID: "tenant1",
Content: encodedRecords[0].Value,
Offset: 0,
}}
recordsChan <- records
require.Eventually(t, func() bool {
blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 0,
EndTime: 100000,
})
require.NoError(t, err)
return len(blocks.Blocks) == 1
}, 5*time.Second, 100*time.Millisecond)
// Verify committed offset
require.Equal(t, int64(0), committer.committed)
}
func TestConsumer_ShutdownFlush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
storage, err := objstore.NewTestStorage(t)
require.NoError(t, err)
metastore := NewTestMetastore()
reg := prometheus.NewRegistry()
flushInterval := 1 * time.Hour
maxFlushSize := int64(1000)
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
recordsChan := make(chan []partition.Record)
wait := consumer.Start(ctx, recordsChan)
stream := logproto.Stream{
Labels: `{__name__="test_metric", label="value1"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1000), Line: "10.5"},
},
}
encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20)
require.NoError(t, err)
records := []partition.Record{{
TenantID: "tenant1",
Content: encodedRecords[0].Value,
Offset: 0,
}}
recordsChan <- records
cancel()
wait()
blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 0,
EndTime: 100000,
})
require.NoError(t, err)
require.Equal(t, 1, len(blocks.Blocks))
// Verify committed offset
require.Equal(t, int64(0), committer.committed)
}
func TestConsumer_MaxFlushSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
storage, err := objstore.NewTestStorage(t)
require.NoError(t, err)
metastore := NewTestMetastore()
reg := prometheus.NewRegistry()
flushInterval := 1 * time.Hour
maxFlushSize := int64(10)
committer := newMockCommitter()
consumerFactory := NewConsumerFactory(metastore, storage, flushInterval, maxFlushSize, log.NewLogfmtLogger(os.Stdout), reg)
consumer, err := consumerFactory(committer)
require.NoError(t, err)
recordsChan := make(chan []partition.Record)
_ = consumer.Start(ctx, recordsChan)
stream := logproto.Stream{
Labels: `{__name__="test_metric", label="value1"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1000), Line: strings.Repeat("a", 100)},
},
}
encodedRecords, err := kafka.Encode(0, "tenant1", stream, 10<<20)
require.NoError(t, err)
records := []partition.Record{{
TenantID: "tenant1",
Content: encodedRecords[0].Value,
Offset: 0,
}}
recordsChan <- records
require.Eventually(t, func() bool {
blocks, err := metastore.ListBlocksForQuery(ctx, &metastorepb.ListBlocksForQueryRequest{
TenantId: "tenant1",
StartTime: 0,
EndTime: 100000,
})
require.NoError(t, err)
return len(blocks.Blocks) == 1
}, 5*time.Second, 100*time.Millisecond)
require.Equal(t, int64(0), committer.committed)
}

@ -1,383 +0,0 @@
package ingester
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/ingester/shutdownmarker"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util"
)
const (
RingName = "kafka-ingester"
PartitionRingName = "kafka-partition"
)
var (
defaultFlushInterval = 15 * time.Second
defaultFlushSize int64 = 300 << 20 // 300 MB
)
// Config for an ingester.
type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Whether the kafka ingester is enabled."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."`
ShutdownMarkerPath string `yaml:"shutdown_marker_path"`
FlushInterval time.Duration `yaml:"flush_interval" doc:"description=The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used."`
FlushSize int64 `yaml:"flush_size" doc:"description=The size at which the ingester will flush and commit offsets to Kafka. If not set, the default flush size will be used."`
PartitionRingConfig partitionring.Config `yaml:"partition_ring" category:"experimental"`
KafkaConfig kafka.Config `yaml:"-"`
}
// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("kafka-ingester.", f, util_log.Logger)
cfg.PartitionRingConfig.RegisterFlagsWithPrefix("kafka-ingester.", f)
f.StringVar(&cfg.ShutdownMarkerPath, "kafka-ingester.shutdown-marker-path", "", "Path where the shutdown marker file is stored. If not set and common.path_prefix is set then common.path_prefix will be used.")
f.BoolVar(&cfg.Enabled, "kafka-ingester.enabled", false, "Whether the Kafka-based ingester path is enabled")
f.DurationVar(&cfg.FlushInterval, "kafka-ingester.flush-interval", defaultFlushInterval, "The interval at which the ingester will flush and commit offsets to Kafka. If not set, the default flush interval will be used.")
f.Int64Var(&cfg.FlushSize, "kafka-ingester.flush-size", defaultFlushSize, "The size at which the ingester will flush and commit offsets to Kafka. If not set, the default flush size will be used.")
}
func (cfg *Config) Validate() error {
if !cfg.Enabled {
return nil
}
if cfg.FlushInterval <= 0 {
return errors.New("kafka-ingester.flush-interval must be greater than 0")
}
if cfg.LifecyclerConfig.RingConfig.ReplicationFactor != 1 {
cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
level.Warn(util_log.Logger).Log("msg", "kafka-ingester.lifecycler.replication-factor has been set to 1. This is the only supported replication factor for the kafka-ingester.")
}
return nil
}
type Wrapper interface {
Wrap(wrapped Interface) Interface
}
// Interface is an interface for the Ingester
type Interface interface {
services.Service
http.Handler
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
}
// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
cfg Config
logger log.Logger
metrics *ingesterMetrics
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
ingesterPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
}
// New makes a new Ingester.
func New(cfg Config,
consumerFactory partition.ConsumerFactory,
logger log.Logger,
metricsNamespace string,
registerer prometheus.Registerer,
) (*Ingester, error) {
metrics := newIngesterMetrics(registerer)
ingesterPartitionID, err := partitionring.ExtractIngesterPartitionID(cfg.LifecyclerConfig.ID)
if err != nil {
return nil, fmt.Errorf("calculating ingester partition ID: %w", err)
}
partitionRingKV := cfg.PartitionRingConfig.KVStore.Mock
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(cfg.PartitionRingConfig.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(registerer, PartitionRingName+"-lifecycler"), logger)
if err != nil {
return nil, fmt.Errorf("creating KV store for ingester partition ring: %w", err)
}
}
partitionRingLifecycler := ring.NewPartitionInstanceLifecycler(
cfg.PartitionRingConfig.ToLifecyclerConfig(ingesterPartitionID, cfg.LifecyclerConfig.ID),
PartitionRingName,
PartitionRingName+"-key",
partitionRingKV,
logger,
prometheus.WrapRegistererWithPrefix("loki_", registerer))
i := &Ingester{
cfg: cfg,
logger: logger,
ingesterPartitionID: ingesterPartitionID,
partitionRingLifecycler: partitionRingLifecycler,
metrics: metrics,
}
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, RingName, RingName+"-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer))
if err != nil {
return nil, err
}
i.partitionReader, err = partition.NewReader(cfg.KafkaConfig, ingesterPartitionID, cfg.LifecyclerConfig.ID, consumerFactory, logger, registerer)
if err != nil {
return nil, err
}
i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)
i.lifecyclerWatcher.WatchService(i.partitionRingLifecycler)
i.lifecyclerWatcher.WatchService(i.partitionReader)
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
return i, nil
}
// ServeHTTP implements the pattern ring status page.
func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) {
i.lifecycler.ServeHTTP(w, r)
}
func (i *Ingester) starting(ctx context.Context) (err error) {
defer func() {
if err != nil {
// if starting() fails for any reason (e.g., context canceled),
// the lifecycler must be stopped.
_ = services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
}()
// First of all we have to check if the shutdown marker is set. This needs to be done
// as first thing because, if found, it may change the behaviour of the ingester startup.
if exists, err := shutdownmarker.Exists(shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath)); err != nil {
return fmt.Errorf("failed to check ingester shutdown marker: %w", err)
} else if exists {
level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath))
i.setPrepareShutdown()
}
// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err = i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}
err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}
err = i.partitionRingLifecycler.StartAsync(context.Background())
if err != nil {
return err
}
err = i.partitionRingLifecycler.AwaitRunning(ctx)
if err != nil {
return err
}
err = i.partitionReader.StartAsync(context.Background())
if err != nil {
return err
}
err = i.partitionReader.AwaitRunning(ctx)
if err != nil {
return err
}
return nil
}
func (i *Ingester) running(ctx context.Context) error {
var serviceError error
select {
// wait until service is asked to stop
case <-ctx.Done():
// stop
case err := <-i.lifecyclerWatcher.Chan():
serviceError = fmt.Errorf("lifecycler failed: %w", err)
}
return serviceError
}
// stopping is called when Ingester transitions to Stopping state.
//
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
var errs util.MultiError
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.partitionReader))
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.partitionRingLifecycler))
// Remove the shutdown marker if it exists since we are shutting down
shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath)
exist, err := shutdownmarker.Exists(shutdownMarkerPath)
if err != nil {
level.Warn(i.logger).Log("msg", "failed to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
} else if exist {
if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil {
level.Warn(i.logger).Log("msg", "failed to remove shutdown marker", "path", shutdownMarkerPath, "err", err)
}
}
return errs.Err()
}
// Watch implements grpc_health_v1.HealthCheck.
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return nil
}
func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request) {
logger := log.With(i.logger, "partition", i.ingesterPartitionID)
// Don't allow callers to change the shutdown configuration while we're in the middle
// of starting or shutting down.
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.ShutdownMarkerPath)
exists, err := shutdownmarker.Exists(shutdownMarkerPath)
if err != nil {
level.Error(i.logger).Log("msg", "unable to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
switch r.Method {
case http.MethodPost:
// It's not allowed to prepare the downscale while in PENDING state. Why? Because if the downscale
// will be later cancelled, we don't know if it was requested in PENDING or ACTIVE state, so we
// don't know to which state reverting back. Given a partition is expected to stay in PENDING state
// for a short period, we simply don't allow this case.
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if state == ring.PartitionPending {
level.Warn(logger).Log("msg", "received a request to prepare partition for shutdown, but the request can't be satisfied because the partition is in PENDING state")
w.WriteHeader(http.StatusConflict)
return
}
if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionInactive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to inactive", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !exists {
if err := shutdownmarker.Create(shutdownMarkerPath); err != nil {
level.Error(i.logger).Log("msg", "unable to create prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
i.setPrepareShutdown()
case http.MethodDelete:
state, _, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// If partition is inactive, make it active. We ignore other states Active and especially Pending.
if state == ring.PartitionInactive {
// We don't switch it back to PENDING state if there are not enough owners because we want to guarantee consistency
// in the read path. If the partition is within the lookback period we need to guarantee that partition will be queried.
// Moving back to PENDING will cause us loosing consistency, because PENDING partitions are not queried by design.
// We could move back to PENDING if there are not enough owners and the partition moved to INACTIVE more than
// "lookback period" ago, but since we delete inactive partitions with no owners that moved to inactive since longer
// than "lookback period" ago, it looks to be an edge case not worth to address.
if err := i.partitionRingLifecycler.ChangePartitionState(r.Context(), ring.PartitionActive); err != nil {
level.Error(logger).Log("msg", "failed to change partition state to active", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if exists {
if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil {
level.Error(i.logger).Log("msg", "unable to remove prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
i.unsetPrepareShutdown()
}
}
state, stateTimestamp, err := i.partitionRingLifecycler.GetPartitionState(r.Context())
if err != nil {
level.Error(logger).Log("msg", "failed to check partition state in the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if state == ring.PartitionInactive {
util.WriteJSONResponse(w, map[string]any{"timestamp": stateTimestamp.Unix()})
} else {
util.WriteJSONResponse(w, map[string]any{"timestamp": 0})
}
}
// setPrepareShutdown toggles ingester lifecycler config to prepare for shutdown
func (i *Ingester) setPrepareShutdown() {
i.lifecycler.SetUnregisterOnShutdown(true)
i.lifecycler.SetFlushOnShutdown(true)
i.partitionRingLifecycler.SetCreatePartitionOnStartup(false)
i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(true)
i.metrics.shutdownMarker.Set(1)
}
func (i *Ingester) unsetPrepareShutdown() {
i.lifecycler.SetUnregisterOnShutdown(i.cfg.LifecyclerConfig.UnregisterOnShutdown)
i.lifecycler.SetFlushOnShutdown(true)
i.partitionRingLifecycler.SetCreatePartitionOnStartup(true)
i.partitionRingLifecycler.SetRemoveOwnerOnShutdown(false)
i.metrics.shutdownMarker.Set(0)
}
// ReadinessHandler is used to indicate to k8s when the ingesters are ready for
// the addition removal of another ingester. Returns 204 when the ingester is
// ready, 500 otherwise.
func (i *Ingester) CheckReady(ctx context.Context) error {
// todo.
if s := i.State(); s != services.Running && s != services.Stopping {
return fmt.Errorf("ingester not ready: %v", s)
}
return i.lifecycler.CheckReady(ctx)
}
// Flush implements ring.FlushTransferer
// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
}
func (i *Ingester) TransferOut(_ context.Context) error {
return nil
}

@ -1,126 +0,0 @@
package ingester
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/util/test"
)
func TestPreparePartitionDownscaleHandler(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
// start ingester.
storage, err := objstore.NewTestStorage(t)
require.NoError(t, err)
ing, err := New(cfg,
NewConsumerFactory(NewTestMetastore(), storage, cfg.FlushInterval, cfg.FlushSize, log.NewNopLogger(), prometheus.NewRegistry()),
log.NewNopLogger(), "test", prometheus.NewRegistry())
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), ing)
require.NoError(t, err)
t.Run("get state", func(t *testing.T) {
w := httptest.NewRecorder()
ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("GET", "/", nil))
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "{\"timestamp\":0}", w.Body.String())
})
t.Run("prepare shutdown pending", func(t *testing.T) {
w := httptest.NewRecorder()
ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("POST", "/", nil))
require.Equal(t, http.StatusConflict, w.Code)
})
t.Run("prepare shutdown and cancel", func(t *testing.T) {
w := httptest.NewRecorder()
test.Poll(t, 5*time.Second, ring.PartitionActive, func() interface{} {
return getState(t, cfg)
})
ing.PreparePartitionDownscaleHandler(w, httptest.NewRequest("POST", "/", nil))
require.Equal(t, http.StatusOK, w.Code)
test.Poll(t, 5*time.Second, ring.PartitionInactive, func() interface{} {
return getState(t, cfg)
})
w2 := httptest.NewRecorder()
ing.PreparePartitionDownscaleHandler(w2, httptest.NewRequest("DELETE", "/", nil))
require.Equal(t, http.StatusOK, w.Code)
test.Poll(t, 5*time.Second, ring.PartitionActive, func() interface{} {
return getState(t, cfg)
})
})
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}
func getState(t *testing.T, cfg Config) ring.PartitionState {
get, err := cfg.PartitionRingConfig.KVStore.Mock.Get(context.Background(), PartitionRingName+"-key")
require.NoError(t, err)
ringDesc := ring.GetOrCreatePartitionRingDesc(get)
return ringDesc.Partitions[0].State
}
// nolint
func defaultIngesterTestConfig(t testing.TB) Config {
kvRing, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { require.NoError(t, closer.Close()) })
kvPartitionRing, closerPartitionRing := consul.NewInMemoryClient(ring.GetPartitionRingCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { require.NoError(t, closerPartitionRing.Close()) })
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvRing
cfg.PartitionRingConfig.KVStore.Mock = kvPartitionRing
cfg.PartitionRingConfig.MinOwnersCount = 1
cfg.PartitionRingConfig.MinOwnersDuration = 0
cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
cfg.LifecyclerConfig.NumTokens = 1
cfg.LifecyclerConfig.ListenPort = 0
cfg.LifecyclerConfig.Addr = "localhost"
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.LifecyclerConfig.MinReadyDuration = 0
return cfg
}
// TestMetastore is a simple in-memory metastore for testing
type TestMetastore struct {
blocks map[string][]*metastorepb.BlockMeta
}
func NewTestMetastore() *TestMetastore {
return &TestMetastore{blocks: make(map[string][]*metastorepb.BlockMeta)}
}
func (m *TestMetastore) ListBlocksForQuery(_ context.Context, req *metastorepb.ListBlocksForQueryRequest, _ ...grpc.CallOption) (*metastorepb.ListBlocksForQueryResponse, error) {
blocks := m.blocks[req.TenantId]
var result []*metastorepb.BlockMeta
for _, block := range blocks {
if block.MinTime <= req.EndTime && block.MaxTime >= req.StartTime {
result = append(result, block)
}
}
return &metastorepb.ListBlocksForQueryResponse{Blocks: result}, nil
}
func (m *TestMetastore) AddBlock(_ context.Context, in *metastorepb.AddBlockRequest, _ ...grpc.CallOption) (*metastorepb.AddBlockResponse, error) {
for _, stream := range in.Block.TenantStreams {
m.blocks[stream.TenantId] = append(m.blocks[stream.TenantId], in.Block)
}
return &metastorepb.AddBlockResponse{}, nil
}

@ -1,20 +0,0 @@
package ingester
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type ingesterMetrics struct {
// Shutdown marker for ingester scale down
shutdownMarker prometheus.Gauge
}
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_prepare_shutdown_requested",
Help: "1 if the ingester has been requested to prepare for shutdown via endpoint or marker file.",
}),
}
}

@ -1,60 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
package shutdownmarker
import (
"os"
"path"
"strings"
"time"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/v3/pkg/util/atomicfs"
)
const shutdownMarkerFilename = "shutdown-requested.txt"
// Create writes a marker file on the given path to indicate that a component is
// going to be scaled down in the future. The presence of this file means that a component
// should perform some operations specified by the component itself before being shutdown.
func Create(p string) error {
return atomicfs.CreateFile(p, strings.NewReader(time.Now().UTC().Format(time.RFC3339)))
}
// Remove removes the shutdown marker file on the given path if it exists.
func Remove(p string) error {
err := os.Remove(p)
if err != nil && !os.IsNotExist(err) {
return err
}
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
merr := multierror.New()
merr.Add(dir.Sync())
merr.Add(dir.Close())
return merr.Err()
}
// Exists returns true if the shutdown marker file exists on the given path, false otherwise
func Exists(p string) (bool, error) {
s, err := os.Stat(p)
if err != nil && os.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, err
}
return s.Mode().IsRegular(), nil
}
// GetPath returns the absolute path of the shutdown marker file
func GetPath(dirPath string) string {
return path.Join(dirPath, shutdownMarkerFilename)
}

@ -1,49 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
package shutdownmarker
import (
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
func TestShutdownMarker_GetPath(t *testing.T) {
dir := "/a/b/c"
expectedPath := filepath.Join(dir, shutdownMarkerFilename)
require.Equal(t, expectedPath, GetPath(dir))
}
func TestShutdownMarker_Create(t *testing.T) {
dir := t.TempDir()
shutdownMarkerPath := GetPath(dir)
exists, err := Exists(shutdownMarkerPath)
require.NoError(t, err)
require.False(t, exists)
err = Create(shutdownMarkerPath)
require.NoError(t, err)
exists, err = Exists(shutdownMarkerPath)
require.NoError(t, err)
require.True(t, exists)
}
func TestShutdownMarker_Remove(t *testing.T) {
dir := t.TempDir()
shutdownMarkerPath := GetPath(dir)
exists, err := Exists(shutdownMarkerPath)
require.NoError(t, err)
require.False(t, exists)
require.Nil(t, Create(shutdownMarkerPath))
exists, err = Exists(shutdownMarkerPath)
require.NoError(t, err)
require.True(t, exists)
require.Nil(t, Remove(shutdownMarkerPath))
exists, err = Exists(shutdownMarkerPath)
require.NoError(t, err)
require.False(t, exists)
}

@ -43,7 +43,6 @@ import (
metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client"
ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
ingester_kafka "github.com/grafana/loki/v3/pkg/kafka/ingester"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/loki/common"
"github.com/grafana/loki/v3/pkg/lokifrontend"
@ -380,7 +379,6 @@ type Loki struct {
MetastoreClient *metastoreclient.Client
partitionRingWatcher *ring.PartitionRingWatcher
partitionRing *ring.PartitionInstanceRing
kafkaIngester *ingester_kafka.Ingester
ClientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics

Loading…
Cancel
Save