chore: add flushAndCommit to make it harder to forget to commit offsets in future (#18851)

pull/18858/head
George Robinson 5 months ago committed by GitHub
parent ee3d36892a
commit be7ff4b294
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 23
      pkg/dataobj/consumer/mock_test.go
  2. 101
      pkg/dataobj/consumer/partition_processor.go
  3. 65
      pkg/dataobj/consumer/partition_processor_test.go

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/logproto"
)
@ -122,3 +123,25 @@ func (m *mockCommitter) CommitRecords(_ context.Context, records ...*kgo.Record)
m.records = append(m.records, records...)
return nil
}
type recordedTocEntry struct {
DataObjectPath string
MinTimestamp time.Time
MaxTimestamp time.Time
}
// A recordingTocWriter wraps a [metastore.TableOfContentsWriter] and records
// all entries written to it.
type recordingTocWriter struct {
entries []recordedTocEntry
*metastore.TableOfContentsWriter
}
func (m *recordingTocWriter) WriteEntry(ctx context.Context, dataobjPath string, minTimestamp, maxTimestamp time.Time) error {
m.entries = append(m.entries, recordedTocEntry{
DataObjectPath: dataobjPath,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
})
return m.TableOfContentsWriter.WriteEntry(ctx, dataobjPath, minTimestamp, maxTimestamp)
}

@ -40,6 +40,10 @@ type committer interface {
CommitRecords(ctx context.Context, records ...*kgo.Record) error
}
type tocWriter interface {
WriteEntry(ctx context.Context, dataobjPath string, minTimestamp, maxTimestamp time.Time) error
}
type partitionProcessor struct {
// Kafka client and topic/partition info
committer committer
@ -54,7 +58,7 @@ type partitionProcessor struct {
builder builder
decoder *kafka.Decoder
uploader *uploader.Uploader
metastoreTocWriter *metastore.TableOfContentsWriter
metastoreTocWriter tocWriter
// Builder initialization
builderOnce sync.Once
@ -224,37 +228,6 @@ func (p *partitionProcessor) initBuilder() error {
return initErr
}
func (p *partitionProcessor) flush() error {
minTime, maxTime := p.builder.TimeRange()
obj, closer, err := p.builder.Flush()
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}
defer closer.Close()
objectPath, err := p.uploader.Upload(p.ctx, obj)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}
if err := p.metastoreTocWriter.WriteEntry(p.ctx, objectPath, minTime, maxTime); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return err
}
if err := p.emitObjectWrittenEvent(objectPath); err != nil {
level.Error(p.logger).Log("msg", "failed to emit event", "err", err)
return err
}
p.lastFlush = p.clock.Now()
return nil
}
func (p *partitionProcessor) emitObjectWrittenEvent(objectPath string) error {
if p.eventsProducerClient == nil {
return nil
@ -317,13 +290,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
return
}
if err := p.flush(); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}
if err := p.commit(); err != nil {
level.Error(p.logger).Log("msg", "failed to commit offset", "err", err)
if err := p.flushAndCommit(); err != nil {
level.Error(p.logger).Log("msg", "failed to flush and commit", "err", err)
return
}
@ -338,6 +306,54 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
p.lastModified = p.clock.Now()
}
// flushAndCommit flushes the builder and, if successful, commits the offset
// of the last record processed. It expects that the last record processed
// was also the last record appended to the builder. If not, data loss can
// occur should the consumer restart or a partition rebalance occur
func (p *partitionProcessor) flushAndCommit() error {
if err := p.flush(); err != nil {
return fmt.Errorf("failed to flush: %w", err)
}
if err := p.commit(); err != nil {
return fmt.Errorf("failed to commit offset: %w", err)
}
return nil
}
// flush builds a complete data object from the builder, uploads it, records
// it in the metastore, and emits an object written event to the events topic.
func (p *partitionProcessor) flush() error {
// The time range must be read before the flush as the builder is reset
// at the end of each flush, resetting the time range.
minTime, maxTime := p.builder.TimeRange()
obj, closer, err := p.builder.Flush()
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}
defer closer.Close()
objectPath, err := p.uploader.Upload(p.ctx, obj)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}
if err := p.metastoreTocWriter.WriteEntry(p.ctx, objectPath, minTime, maxTime); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return err
}
if err := p.emitObjectWrittenEvent(objectPath); err != nil {
level.Error(p.logger).Log("msg", "failed to emit event", "err", err)
return err
}
p.lastFlush = p.clock.Now()
return nil
}
// commits the offset of the last record processed. It should be called after
// each successful flush to avoid duplicate data in consecutive data objects.
func (p *partitionProcessor) commit() error {
@ -373,11 +389,8 @@ func (p *partitionProcessor) idleFlush() (bool, error) {
if !p.needsIdleFlush() {
return false, nil
}
if err := p.flush(); err != nil {
return false, fmt.Errorf("failed to flush: %w", err)
}
if err := p.commit(); err != nil {
return false, fmt.Errorf("failed to commit offset: %w", err)
if err := p.flushAndCommit(); err != nil {
return false, err
}
return true, nil
}

@ -31,6 +31,71 @@ var testBuilderConfig = logsobj.BuilderConfig{
SectionStripeMergeLimit: 2,
}
func TestPartitionProcessor_Flush(t *testing.T) {
clock := quartz.NewMock(t)
p := newTestPartitionProcessor(t, clock)
// Wrap the TOC writer to record all of the entries.
tocWriter, ok := p.metastoreTocWriter.(*metastore.TableOfContentsWriter)
require.True(t, ok)
recordingTocWriter := &recordingTocWriter{TableOfContentsWriter: tocWriter}
p.metastoreTocWriter = recordingTocWriter
// Push two streams, one minute apart.
now1 := clock.Now()
s1 := logproto.Stream{
Labels: `{service="test"}`,
Entries: []push.Entry{{
Timestamp: now1,
Line: "abc",
}},
}
b1, err := s1.Marshal()
require.NoError(t, err)
p.processRecord(&kgo.Record{
Key: []byte("test-tenant"),
Value: b1,
Timestamp: now1,
})
// Push the second stream, one minute later.
clock.Advance(time.Minute)
now2 := clock.Now()
s2 := s1
s2.Entries[0].Timestamp = now2
b2, err := s2.Marshal()
require.NoError(t, err)
p.processRecord(&kgo.Record{
Key: []byte("test-tenant"),
Value: b2,
Timestamp: now2,
})
// No flush should have occurred, we will flush ourselves instead.
require.True(t, p.lastFlush.IsZero())
// Get the time range. We will use this to check that the metastore has
// the correct time range.
minTime, maxTime := p.builder.TimeRange()
require.Equal(t, now1, minTime)
require.Equal(t, now2, maxTime)
// Flush the data object.
require.NoError(t, p.flush())
require.Equal(t, now2, p.lastFlush)
// Flush should produce two uploads, the data object and the metastore
// object.
bucket, ok := p.bucket.(*mockBucket)
require.True(t, ok)
require.Len(t, bucket.uploads, 2)
// Check that the expected entries were written to the metastore.
require.Len(t, recordingTocWriter.entries, 1)
actual := recordingTocWriter.entries[0]
require.Equal(t, minTime, actual.MinTimestamp)
require.Equal(t, maxTime, actual.MaxTimestamp)
}
func TestPartitionProcessor_IdleFlush(t *testing.T) {
clock := quartz.NewMock(t)
p := newTestPartitionProcessor(t, clock)

Loading…
Cancel
Save