diff --git a/pkg/dataobj/consumer/mock_test.go b/pkg/dataobj/consumer/mock_test.go index cb54072bd0..f6870358dd 100644 --- a/pkg/dataobj/consumer/mock_test.go +++ b/pkg/dataobj/consumer/mock_test.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/twmb/franz-go/pkg/kgo" @@ -131,10 +130,6 @@ func (m *mockBuilder) TimeRanges() []multitenancy.TimeRange { return m.builder.TimeRanges() } -func (m *mockBuilder) UnregisterMetrics(r prometheus.Registerer) { - m.builder.UnregisterMetrics(r) -} - // A mockCommitter implements the committer interface for tests. type mockCommitter struct { offsets []int64 diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/processor.go similarity index 70% rename from pkg/dataobj/consumer/partition_processor.go rename to pkg/dataobj/consumer/processor.go index 9af9f1d74c..5c273f993d 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/processor.go @@ -22,64 +22,71 @@ import ( "github.com/grafana/loki/v3/pkg/scratch" ) -// builder allows mocking of [logsobj.Builder] in tests. +// A builder allows mocking of [logsobj.Builder] in tests. type builder interface { Append(tenant string, stream logproto.Stream) error GetEstimatedSize() int Flush() (*dataobj.Object, io.Closer, error) TimeRanges() []multitenancy.TimeRange - UnregisterMetrics(prometheus.Registerer) CopyAndSort(ctx context.Context, obj *dataobj.Object) (*dataobj.Object, io.Closer, error) } -// flusher allows mocking of flushes in tests. +// A flusher allows mocking of flushes in tests. type flusher interface { FlushAsync(ctx context.Context, builder builder, startTime time.Time, offset int64, done func(error)) } -type partitionProcessor struct { +// A processor receives records and builds data objects from them. +type processor struct { *services.BasicService + builder builder + decoder *kafka.Decoder + recordsChan chan *kgo.Record + flusher flusher - // Kafka client and topic/partition info - topic string - partition int32 - // lastRecord contains the last record appended to the builder. It is used - // to commit the correct offset after a flush. - lastRecord *kgo.Record - builder builder - decoder *kafka.Decoder + // offset contains the offset of the last record appended to the data object + // builder. It is used to commit the correct offset after a flush. + offset int64 - // Builder initialization - builderOnce sync.Once - builderCfg logsobj.BuilderConfig - scratchStore scratch.Store - - // Idle stream handling + // idleFlushTimeout is the maximum amount of time to wait for more data. If no + // records are received within this timeout, the current data object builder + // is flushed. Most of the time, this timeout occurs when a partition is + // marked as inactive in preparation for a scale down. idleFlushTimeout time.Duration - // Handling flushing dataobjs even if they are not full or idle for too long. + + // maxBuilderAge is the maximum age a data object builder can reach before it + // must be flushed. This happens if a partition does not receive enough data + // to reach the target size within the allocated time. We would rather flush + // a small data object and compact it later then wait for more data to arrive + // and delay querying. maxBuilderAge time.Duration - // lastModified is used to know when the idle is exceeded. - // The initial value is zero and must be reset to zero after each flush. - lastModified time.Time + // firstAppend tracks the wall clock time of the first append to the data + // object builder. It is used to know if the builder has exceeded the + // maximum age. It must be reset after each flush. + firstAppend time.Time - // earliestRecordTime tracks the earliest timestamp all the records Appended to the builder for each object. + // lastAppend tracks the wall clock time of the last append to the data + // object builder. It is used to know if the builder has exceeded the + // idle timeout. It must be reset after each flush. + lastAppend time.Time + + // earliestRecordTime tracks the timestamp of the earliest record appended + // to the data object builder. It is required for the metastore index. earliestRecordTime time.Time - // firstAppendTime tracks the time of the first append to the builder, as `earliestRecordTime` is highly influenced by scenarios of severe lagging. - firstAppendTime time.Time // Metrics metrics *partitionOffsetMetrics + logger log.Logger + reg prometheus.Registerer - // Control and coordination - reg prometheus.Registerer - logger log.Logger - - recordsChan chan *kgo.Record - flusher flusher + // TODO(grobinson): Will replace with a builder factory. + builderCfg logsobj.BuilderConfig + scratchStore scratch.Store + builderOnce sync.Once } -func newPartitionProcessor( +func newProcessor( builderCfg logsobj.BuilderConfig, scratchStore scratch.Store, idleFlushTimeout time.Duration, @@ -90,7 +97,7 @@ func newPartitionProcessor( flusher flusher, logger log.Logger, reg prometheus.Registerer, -) *partitionProcessor { +) *processor { decoder, err := kafka.NewDecoder() if err != nil { panic(err) @@ -106,12 +113,9 @@ func newPartitionProcessor( level.Error(logger).Log("msg", "failed to register partition metrics", "err", err) } - p := &partitionProcessor{ - topic: topic, - partition: partition, + p := &processor{ logger: logger, decoder: decoder, - reg: reg, builderCfg: builderCfg, scratchStore: scratchStore, metrics: metrics, @@ -119,27 +123,28 @@ func newPartitionProcessor( maxBuilderAge: maxBuilderAge, recordsChan: recordsChan, flusher: flusher, + reg: reg, } p.BasicService = services.NewBasicService(p.starting, p.running, p.stopping) return p } // starting implements [services.StartingFn]. -func (p *partitionProcessor) starting(_ context.Context) error { +func (p *processor) starting(_ context.Context) error { return nil } // running implements [services.RunningFn]. -func (p *partitionProcessor) running(ctx context.Context) error { +func (p *processor) running(ctx context.Context) error { return p.Run(ctx) } // stopping implements [services.StoppingFn]. -func (p *partitionProcessor) stopping(_ error) error { +func (p *processor) stopping(_ error) error { return nil } -func (p *partitionProcessor) Run(ctx context.Context) error { +func (p *processor) Run(ctx context.Context) error { defer func() { level.Info(p.logger).Log("msg", "stopped partition processor") }() @@ -166,7 +171,7 @@ func (p *partitionProcessor) Run(ctx context.Context) error { } } -func (p *partitionProcessor) initBuilder() error { +func (p *processor) initBuilder() error { var initErr error p.builderOnce.Do(func() { // Dataobj builder @@ -184,7 +189,8 @@ func (p *partitionProcessor) initBuilder() error { return initErr } -func (p *partitionProcessor) processRecord(ctx context.Context, record *kgo.Record) { +func (p *processor) processRecord(ctx context.Context, record *kgo.Record) { + now := time.Now() p.metrics.processedRecords.Inc() // Update offset metric at the end of processing @@ -241,26 +247,25 @@ func (p *partitionProcessor) processRecord(ctx context.Context, record *kgo.Reco } } - if p.firstAppendTime.IsZero() { - p.firstAppendTime = time.Now() + if p.firstAppend.IsZero() { + p.firstAppend = now } - - p.lastRecord = record - p.lastModified = time.Now() + p.lastAppend = now + p.offset = record.Offset } -func (p *partitionProcessor) shouldFlushDueToMaxAge() bool { +func (p *processor) shouldFlushDueToMaxAge() bool { return p.maxBuilderAge > 0 && p.builder.GetEstimatedSize() > 0 && - !p.firstAppendTime.IsZero() && - time.Since(p.firstAppendTime) > p.maxBuilderAge + !p.firstAppend.IsZero() && + time.Since(p.firstAppend) > p.maxBuilderAge } // idleFlush flushes the partition if it has exceeded the idle flush timeout. // It returns true if the partition was flushed, false with a non-nil error // if the partition could not be flushed, and false with a nil error if // the partition has not exceeded the timeout. -func (p *partitionProcessor) idleFlush(ctx context.Context) (bool, error) { +func (p *processor) idleFlush(ctx context.Context) (bool, error) { if !p.needsIdleFlush() { return false, nil } @@ -274,20 +279,20 @@ func (p *partitionProcessor) idleFlush(ctx context.Context) (bool, error) { // needsIdleFlush returns true if the partition has exceeded the idle timeout // and the builder has some data buffered. -func (p *partitionProcessor) needsIdleFlush() bool { +func (p *processor) needsIdleFlush() bool { // This is a safety check to make sure we never flush empty data objects. // It should never happen that lastModified is non-zero while the builder // is either uninitialized or empty. if p.builder == nil || p.builder.GetEstimatedSize() == 0 { return false } - if p.lastModified.IsZero() { + if p.lastAppend.IsZero() { return false } - return time.Since(p.lastModified) > p.idleFlushTimeout + return time.Since(p.lastAppend) > p.idleFlushTimeout } -func (p *partitionProcessor) flush(ctx context.Context) error { +func (p *processor) flush(ctx context.Context) error { var ( err error done = make(chan struct{}) @@ -295,11 +300,10 @@ func (p *partitionProcessor) flush(ctx context.Context) error { defer func() { // Reset the state to prepare for building the next data object. p.earliestRecordTime = time.Time{} - p.firstAppendTime = time.Time{} - p.lastModified = time.Time{} - p.lastRecord = nil + p.firstAppend = time.Time{} + p.lastAppend = time.Time{} }() - p.flusher.FlushAsync(ctx, p.builder, p.earliestRecordTime, p.lastRecord.Offset, func(flushErr error) { + p.flusher.FlushAsync(ctx, p.builder, p.earliestRecordTime, p.offset, func(flushErr error) { err = flushErr close(done) }) diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/processor_test.go similarity index 81% rename from pkg/dataobj/consumer/partition_processor_test.go rename to pkg/dataobj/consumer/processor_test.go index 717d52c69d..4eefc51f2a 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/processor_test.go @@ -41,13 +41,13 @@ func TestPartitionProcessor_BuilderMaxAge(t *testing.T) { ctx = t.Context() reg = prometheus.NewRegistry() flusher = &mockFlusher{} - proc *partitionProcessor + proc *processor ) - proc = newPartitionProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, flusher, log.NewNopLogger(), reg) + proc = newProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, flusher, log.NewNopLogger(), reg) // Since no records have been pushed, the first append time should be zero, // and no flush should have occurred. - require.True(t, proc.firstAppendTime.IsZero()) + require.True(t, proc.firstAppend.IsZero()) require.Equal(t, 0, flusher.flushes) // Process a record containing some log lines. No flush should occur because @@ -56,8 +56,8 @@ func TestPartitionProcessor_BuilderMaxAge(t *testing.T) { // The first append time should be set to the current time, but no flush // should have occurred. - require.Equal(t, time.Now(), proc.firstAppendTime) - require.Equal(t, time.Now(), proc.lastModified) + require.Equal(t, time.Now(), proc.firstAppend) + require.Equal(t, time.Now(), proc.lastAppend) require.Equal(t, 0, flusher.flushes) // Advance time past the maximum age. A flush should occur, and the @@ -68,8 +68,8 @@ func TestPartitionProcessor_BuilderMaxAge(t *testing.T) { // The last flushed time should be updated to the current time, and so should // the first append time to reflect the start of the new data object. - require.Equal(t, time.Now(), proc.firstAppendTime) - require.Equal(t, time.Now(), proc.lastModified) + require.Equal(t, time.Now(), proc.firstAppend) + require.Equal(t, time.Now(), proc.lastAppend) require.NotEqual(t, proc.builder.GetEstimatedSize(), 0) require.Equal(t, 1, flusher.flushes) @@ -78,8 +78,8 @@ func TestPartitionProcessor_BuilderMaxAge(t *testing.T) { expectedLastFlushed := time.Now() time.Sleep(time.Minute) proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())) - require.Equal(t, expectedLastFlushed, proc.firstAppendTime) - require.Equal(t, time.Now(), proc.lastModified) + require.Equal(t, expectedLastFlushed, proc.firstAppend) + require.Equal(t, time.Now(), proc.lastAppend) require.Equal(t, 1, flusher.flushes) // Check the metrics. @@ -97,9 +97,9 @@ func TestPartitionProcessor_IdleFlush(t *testing.T) { ctx = t.Context() reg = prometheus.NewRegistry() flusher = &mockFlusher{} - proc *partitionProcessor + proc *processor ) - proc = newPartitionProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, flusher, log.NewNopLogger(), reg) + proc = newProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, flusher, log.NewNopLogger(), reg) // The builder is uninitialized, which means its size is also zero. No flush // should occur. @@ -127,7 +127,7 @@ func TestPartitionProcessor_IdleFlush(t *testing.T) { // Process a record containing some log lines. No flush should occur because // when log lines are appended to the builder it resets the idle timeout. proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())) - require.False(t, proc.lastModified.IsZero()) + require.False(t, proc.lastAppend.IsZero()) flushed, err = proc.idleFlush(ctx) require.NoError(t, err) require.False(t, flushed) @@ -163,46 +163,44 @@ func TestPartitionProcessor_Flush(t *testing.T) { reg = prometheus.NewRegistry() mockFlusher = &mockFlusher{} _ = &failureFlusher{} - proc *partitionProcessor + proc *processor ) - proc = newPartitionProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, mockFlusher, log.NewNopLogger(), reg) + proc = newProcessor(testBuilderCfg, scratch.NewMemory(), 5*time.Minute, 30*time.Minute, "topic", 1, nil, mockFlusher, log.NewNopLogger(), reg) // No flush should have occurred. require.Equal(t, 0, mockFlusher.flushes) // Process a record containing some log lines. No flush should occur. rec1 := newTestRecord(t, "tenant", time.Now()) proc.processRecord(ctx, rec1) - require.Equal(t, time.Now(), proc.firstAppendTime) - require.Equal(t, time.Now(), proc.lastModified) - require.Equal(t, rec1, proc.lastRecord) + require.Equal(t, time.Now(), proc.firstAppend) + require.Equal(t, time.Now(), proc.lastAppend) + require.Equal(t, rec1.Offset, proc.offset) // Advance time and force a flush. time.Sleep(time.Second) require.NoError(t, proc.flush(ctx)) require.Equal(t, 1, mockFlusher.flushes) // The following fields should be reset at the end of every flush. - require.True(t, proc.firstAppendTime.IsZero()) + require.True(t, proc.firstAppend.IsZero()) require.True(t, proc.earliestRecordTime.IsZero()) - require.True(t, proc.lastModified.IsZero()) - require.Nil(t, proc.lastRecord) + require.True(t, proc.lastAppend.IsZero()) // Process another record containing some log lines. No flush should occur. proc.flusher = &failureFlusher{} rec2 := newTestRecord(t, "tenant", time.Now()) proc.processRecord(ctx, rec2) - require.Equal(t, time.Now(), proc.firstAppendTime) - require.Equal(t, time.Now(), proc.lastModified) - require.Equal(t, rec2, proc.lastRecord) + require.Equal(t, time.Now(), proc.firstAppend) + require.Equal(t, time.Now(), proc.lastAppend) + require.Equal(t, rec2.Offset, proc.offset) // Advance time and force a flush. This flush should fail. time.Sleep(time.Second) require.EqualError(t, proc.flush(ctx), "failed to flush") require.Equal(t, 1, mockFlusher.flushes) // Despite the failure, the following fields should still be reset. - require.True(t, proc.firstAppendTime.IsZero()) + require.True(t, proc.firstAppend.IsZero()) require.True(t, proc.earliestRecordTime.IsZero()) - require.True(t, proc.lastModified.IsZero()) - require.Nil(t, proc.lastRecord) + require.True(t, proc.lastAppend.IsZero()) }) } diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 434faaf6f2..a794a633fd 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -38,7 +38,7 @@ type Service struct { lifecycler *ring.Lifecycler partitionInstanceLifecycler *ring.PartitionInstanceLifecycler consumer *kafkav2.SinglePartitionConsumer - processor *partitionProcessor + processor *processor flusher *flusherImpl downscalePermitted downscalePermittedFunc watcher *services.FailureWatcher @@ -161,7 +161,7 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto logger, reg, ) - s.processor = newPartitionProcessor( + s.processor = newProcessor( cfg.BuilderConfig, scratchStore, cfg.IdleFlushTimeout,