From be7ff4b294b44ade4666a67ece559d9d9b0abeaf Mon Sep 17 00:00:00 2001 From: George Robinson Date: Thu, 14 Aug 2025 14:24:34 +0100 Subject: [PATCH] chore: add flushAndCommit to make it harder to forget to commit offsets in future (#18851) --- pkg/dataobj/consumer/mock_test.go | 23 ++++ pkg/dataobj/consumer/partition_processor.go | 101 ++++++++++-------- .../consumer/partition_processor_test.go | 65 +++++++++++ 3 files changed, 145 insertions(+), 44 deletions(-) diff --git a/pkg/dataobj/consumer/mock_test.go b/pkg/dataobj/consumer/mock_test.go index 6b50219c5e..2e48a1040d 100644 --- a/pkg/dataobj/consumer/mock_test.go +++ b/pkg/dataobj/consumer/mock_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -122,3 +123,25 @@ func (m *mockCommitter) CommitRecords(_ context.Context, records ...*kgo.Record) m.records = append(m.records, records...) return nil } + +type recordedTocEntry struct { + DataObjectPath string + MinTimestamp time.Time + MaxTimestamp time.Time +} + +// A recordingTocWriter wraps a [metastore.TableOfContentsWriter] and records +// all entries written to it. +type recordingTocWriter struct { + entries []recordedTocEntry + *metastore.TableOfContentsWriter +} + +func (m *recordingTocWriter) WriteEntry(ctx context.Context, dataobjPath string, minTimestamp, maxTimestamp time.Time) error { + m.entries = append(m.entries, recordedTocEntry{ + DataObjectPath: dataobjPath, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + }) + return m.TableOfContentsWriter.WriteEntry(ctx, dataobjPath, minTimestamp, maxTimestamp) +} diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 883dfd93b4..055b1e2da9 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -40,6 +40,10 @@ type committer interface { CommitRecords(ctx context.Context, records ...*kgo.Record) error } +type tocWriter interface { + WriteEntry(ctx context.Context, dataobjPath string, minTimestamp, maxTimestamp time.Time) error +} + type partitionProcessor struct { // Kafka client and topic/partition info committer committer @@ -54,7 +58,7 @@ type partitionProcessor struct { builder builder decoder *kafka.Decoder uploader *uploader.Uploader - metastoreTocWriter *metastore.TableOfContentsWriter + metastoreTocWriter tocWriter // Builder initialization builderOnce sync.Once @@ -224,37 +228,6 @@ func (p *partitionProcessor) initBuilder() error { return initErr } -func (p *partitionProcessor) flush() error { - minTime, maxTime := p.builder.TimeRange() - - obj, closer, err := p.builder.Flush() - if err != nil { - level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) - return err - } - defer closer.Close() - - objectPath, err := p.uploader.Upload(p.ctx, obj) - if err != nil { - level.Error(p.logger).Log("msg", "failed to upload object", "err", err) - return err - } - - if err := p.metastoreTocWriter.WriteEntry(p.ctx, objectPath, minTime, maxTime); err != nil { - level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) - return err - } - - if err := p.emitObjectWrittenEvent(objectPath); err != nil { - level.Error(p.logger).Log("msg", "failed to emit event", "err", err) - return err - } - - p.lastFlush = p.clock.Now() - - return nil -} - func (p *partitionProcessor) emitObjectWrittenEvent(objectPath string) error { if p.eventsProducerClient == nil { return nil @@ -317,13 +290,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { return } - if err := p.flush(); err != nil { - level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) - return - } - - if err := p.commit(); err != nil { - level.Error(p.logger).Log("msg", "failed to commit offset", "err", err) + if err := p.flushAndCommit(); err != nil { + level.Error(p.logger).Log("msg", "failed to flush and commit", "err", err) return } @@ -338,6 +306,54 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { p.lastModified = p.clock.Now() } +// flushAndCommit flushes the builder and, if successful, commits the offset +// of the last record processed. It expects that the last record processed +// was also the last record appended to the builder. If not, data loss can +// occur should the consumer restart or a partition rebalance occur +func (p *partitionProcessor) flushAndCommit() error { + if err := p.flush(); err != nil { + return fmt.Errorf("failed to flush: %w", err) + } + if err := p.commit(); err != nil { + return fmt.Errorf("failed to commit offset: %w", err) + } + return nil +} + +// flush builds a complete data object from the builder, uploads it, records +// it in the metastore, and emits an object written event to the events topic. +func (p *partitionProcessor) flush() error { + // The time range must be read before the flush as the builder is reset + // at the end of each flush, resetting the time range. + minTime, maxTime := p.builder.TimeRange() + obj, closer, err := p.builder.Flush() + if err != nil { + level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) + return err + } + defer closer.Close() + + objectPath, err := p.uploader.Upload(p.ctx, obj) + if err != nil { + level.Error(p.logger).Log("msg", "failed to upload object", "err", err) + return err + } + + if err := p.metastoreTocWriter.WriteEntry(p.ctx, objectPath, minTime, maxTime); err != nil { + level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + return err + } + + if err := p.emitObjectWrittenEvent(objectPath); err != nil { + level.Error(p.logger).Log("msg", "failed to emit event", "err", err) + return err + } + + p.lastFlush = p.clock.Now() + + return nil +} + // commits the offset of the last record processed. It should be called after // each successful flush to avoid duplicate data in consecutive data objects. func (p *partitionProcessor) commit() error { @@ -373,11 +389,8 @@ func (p *partitionProcessor) idleFlush() (bool, error) { if !p.needsIdleFlush() { return false, nil } - if err := p.flush(); err != nil { - return false, fmt.Errorf("failed to flush: %w", err) - } - if err := p.commit(); err != nil { - return false, fmt.Errorf("failed to commit offset: %w", err) + if err := p.flushAndCommit(); err != nil { + return false, err } return true, nil } diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/partition_processor_test.go index 51ab12cc51..a0b5c6235a 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/partition_processor_test.go @@ -31,6 +31,71 @@ var testBuilderConfig = logsobj.BuilderConfig{ SectionStripeMergeLimit: 2, } +func TestPartitionProcessor_Flush(t *testing.T) { + clock := quartz.NewMock(t) + p := newTestPartitionProcessor(t, clock) + // Wrap the TOC writer to record all of the entries. + tocWriter, ok := p.metastoreTocWriter.(*metastore.TableOfContentsWriter) + require.True(t, ok) + recordingTocWriter := &recordingTocWriter{TableOfContentsWriter: tocWriter} + p.metastoreTocWriter = recordingTocWriter + + // Push two streams, one minute apart. + now1 := clock.Now() + s1 := logproto.Stream{ + Labels: `{service="test"}`, + Entries: []push.Entry{{ + Timestamp: now1, + Line: "abc", + }}, + } + b1, err := s1.Marshal() + require.NoError(t, err) + p.processRecord(&kgo.Record{ + Key: []byte("test-tenant"), + Value: b1, + Timestamp: now1, + }) + + // Push the second stream, one minute later. + clock.Advance(time.Minute) + now2 := clock.Now() + s2 := s1 + s2.Entries[0].Timestamp = now2 + b2, err := s2.Marshal() + require.NoError(t, err) + p.processRecord(&kgo.Record{ + Key: []byte("test-tenant"), + Value: b2, + Timestamp: now2, + }) + + // No flush should have occurred, we will flush ourselves instead. + require.True(t, p.lastFlush.IsZero()) + + // Get the time range. We will use this to check that the metastore has + // the correct time range. + minTime, maxTime := p.builder.TimeRange() + require.Equal(t, now1, minTime) + require.Equal(t, now2, maxTime) + + // Flush the data object. + require.NoError(t, p.flush()) + require.Equal(t, now2, p.lastFlush) + + // Flush should produce two uploads, the data object and the metastore + // object. + bucket, ok := p.bucket.(*mockBucket) + require.True(t, ok) + require.Len(t, bucket.uploads, 2) + + // Check that the expected entries were written to the metastore. + require.Len(t, recordingTocWriter.entries, 1) + actual := recordingTocWriter.entries[0] + require.Equal(t, minTime, actual.MinTimestamp) + require.Equal(t, maxTime, actual.MaxTimestamp) +} + func TestPartitionProcessor_IdleFlush(t *testing.T) { clock := quartz.NewMock(t) p := newTestPartitionProcessor(t, clock)