fix(dataobj): Flush into multiple index objects when ErrBuilderFull (#19223)

pull/19988/head
Periklis Tsirakidis 1 month ago committed by GitHub
parent 3904c2b8f7
commit 32dbef99b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      pkg/dataobj/index/builder.go
  2. 3
      pkg/dataobj/index/builder_test.go
  3. 4
      pkg/dataobj/index/calculate.go
  4. 132
      pkg/dataobj/index/indexer.go
  5. 102
      pkg/dataobj/index/indexer_test.go
  6. 14
      pkg/dataobj/index/indexobj/builder.go
  7. 5
      pkg/dataobj/index/indexobj/builder_test.go

@ -72,6 +72,7 @@ type calculator interface {
Flush() (*dataobj.Object, io.Closer, error)
TimeRanges() []multitenancy.TimeRange
Reset()
IsFull() bool
}
// An interface for the methods needed from a kafka client. Useful for testing.

@ -161,6 +161,9 @@ func TestIndexBuilder(t *testing.T) {
})
}
_, err = p.indexer.(*serialIndexer).flushIndex(ctx, 0)
require.NoError(t, err)
indexes := readAllSectionPointers(t, bucket)
require.Equal(t, 30, len(indexes))
}

@ -45,6 +45,10 @@ func (c *Calculator) Flush() (*dataobj.Object, io.Closer, error) {
return c.indexobjBuilder.Flush()
}
func (c *Calculator) IsFull() bool {
return c.indexobjBuilder.IsFull()
}
// Calculate reads the log data from the input logs object and appends the resulting indexes to calculator's builder.
// Calculate is not thread-safe.
func (c *Calculator) Calculate(ctx context.Context, logger log.Logger, reader *dataobj.Object, objectPath string) error {

@ -32,7 +32,7 @@ type buildRequest struct {
// buildResult represents the result of an index build operation
type buildResult struct {
indexPath string
indexPath string // Index path when ErrBuilderFull causes multiple flushes
records []*kgo.Record // Records to commit after successful build
err error
}
@ -65,6 +65,8 @@ type serialIndexer struct {
// Download pipeline
downloadQueue chan metastore.ObjectWrittenEvent
downloadedObjects chan downloadedObject
skipDownloads chan struct{} // Signal to skip downloading new objects
skipMode bool // Track whether we're in skip mode
// Worker management
buildRequestChan chan buildRequest
@ -96,6 +98,7 @@ func newSerialIndexer(
buildRequestChan: make(chan buildRequest, cfg.QueueSize),
downloadQueue: make(chan metastore.ObjectWrittenEvent, 32),
downloadedObjects: make(chan downloadedObject, 1),
skipDownloads: make(chan struct{}),
}
// Initialize dskit Service
@ -135,6 +138,9 @@ func (si *serialIndexer) stopping(_ error) error {
close(si.downloadQueue)
close(si.buildRequestChan)
// Close the skip downloads channel to signal any waiting download workers
close(si.skipDownloads)
// Wait for workers to finish
si.downloadWorkerWg.Wait()
si.buildWorkerWg.Wait()
@ -196,6 +202,8 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
level.Debug(si.logger).Log("msg", "download worker started")
defer level.Debug(si.logger).Log("msg", "download worker stopped")
si.skipMode = false // Track whether we're in skip mode
for {
select {
case event, ok := <-si.downloadQueue:
@ -204,6 +212,12 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
return
}
if si.skipMode {
// Skip downloading, just drain the queue
level.Debug(si.logger).Log("msg", "skipping download due to skip signal", "object_path", event.ObjectPath)
continue
}
objLogger := log.With(si.logger, "object_path", event.ObjectPath)
downloadStart := time.Now()
@ -247,6 +261,11 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
return
}
case <-si.skipDownloads:
level.Debug(si.logger).Log("msg", "download worker received skip signal, entering skip mode")
si.skipMode = true
// Continue in the loop to drain the downloadQueue
case <-ctx.Done():
return
}
@ -302,7 +321,7 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
}
// Build the index using internal method
indexPath, err := si.buildIndex(req.ctx, events, req.partition)
indexPath, processed, err := si.buildIndex(req.ctx, events, req.partition)
// Update metrics
buildTime := time.Since(start)
@ -310,18 +329,18 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
if err != nil {
level.Error(si.logger).Log("msg", "failed to build index",
"partition", req.partition, "err", err, "duration", buildTime)
"partition", req.partition, "err", err, "duration", buildTime, "processed", processed)
return buildResult{err: err}
}
level.Debug(si.logger).Log("msg", "successfully built index",
"partition", req.partition, "index_path", indexPath, "duration", buildTime,
"events", len(events))
"events", len(events), "processed", processed)
// Extract records for committing
records := make([]*kgo.Record, len(req.events))
for i, buffered := range req.events {
records[i] = buffered.record
// Extract records for committing - only for processed events
records := make([]*kgo.Record, processed)
for i := range processed {
records[i] = req.events[i].record
}
return buildResult{
@ -331,8 +350,11 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
}
}
// buildIndex is the core index building logic (moved from builder)
func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent, partition int32) (string, error) {
// buildIndex is writing all metastore events to a single index object. It
// returns the index path and the number of events processed or an error if the index object is not created.
// The number of events processed can be less than the number of events if the builder becomes full
// when the trigger is triggerTypeAppend.
func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent, partition int32) (string, int, error) {
level.Debug(si.logger).Log("msg", "building index", "events", len(events), "partition", partition)
start := time.Now()
@ -340,30 +362,32 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime)
if err != nil {
level.Error(si.logger).Log("msg", "failed to parse write time", "err", err)
return "", err
return "", 0, err
}
si.builderMetrics.setProcessingDelay(writeTime)
// Trigger the downloads
for _, event := range events {
select {
case si.downloadQueue <- event:
// Successfully sent event for download
case <-ctx.Done():
return "", ctx.Err()
return "", 0, ctx.Err()
}
}
// Process the results as they are downloaded
// Process downloaded objects, handling ErrBuilderFull
processingErrors := multierror.New()
for range len(events) {
processed := 0
for processed < len(events) {
var obj downloadedObject
select {
case obj = <-si.downloadedObjects:
case <-ctx.Done():
return "", ctx.Err()
return "", processed, ctx.Err()
}
processed++
objLogger := log.With(si.logger, "object_path", obj.event.ObjectPath)
level.Debug(objLogger).Log("msg", "processing object")
@ -372,26 +396,78 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
continue
}
reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes)))
if err != nil {
processingErrors.Add(fmt.Errorf("failed to read object: %w", err))
// Process this object
if err := si.processObject(ctx, objLogger, obj); err != nil {
processingErrors.Add(fmt.Errorf("failed to process object: %w", err))
continue
}
if err := si.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath); err != nil {
processingErrors.Add(fmt.Errorf("failed to calculate index: %w", err))
continue
// Check if builder became full during processing
if si.calculator.IsFull() {
// Signal download worker to skip downloading new objects
select {
case si.skipDownloads <- struct{}{}:
level.Debug(si.logger).Log("msg", "sent skip signal to download worker")
default:
// Channel might be full or already closed, continue anyway
level.Debug(si.logger).Log("msg", "failed to send skip signal to download worker, channel full or closed")
}
// Drain any remaining downloaded objects to prevent goroutine leaks
drainLoop:
for {
select {
case <-si.downloadedObjects:
// Drain the channel, continue draining
case <-ctx.Done():
break drainLoop
default:
break drainLoop // No more objects to drain
}
}
break
}
}
if processingErrors.Err() != nil {
return "", processingErrors.Err()
return "", processed, processingErrors.Err()
}
// Flush current index and start fresh for each trigger type means:
// - append: Either the calculator became full and the remaining events will be processed by the next trigger
// or all events have been processed, so we flush the current index and start fresh
// - max-idle: All events have been processed, so we flush the current index and start fresh
indexPath, flushErr := si.flushIndex(ctx, partition)
if flushErr != nil {
return "", processed, fmt.Errorf("failed to flush index after processing full object: %w", flushErr)
}
level.Debug(si.logger).Log("msg", "finished building index files", "partition", partition,
"events", len(events), "processed", processed, "index_path", indexPath, "duration", time.Since(start))
return indexPath, processed, nil
}
// processObject handles processing a single downloaded object
func (si *serialIndexer) processObject(ctx context.Context, objLogger log.Logger, obj downloadedObject) error {
reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes)))
if err != nil {
return fmt.Errorf("failed to read object: %w", err)
}
return si.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath)
}
// flushIndex flushes the current calculator state to an index object
func (si *serialIndexer) flushIndex(ctx context.Context, partition int32) (string, error) {
tenantTimeRanges := si.calculator.TimeRanges()
if len(tenantTimeRanges) == 0 {
return "", nil // Nothing to flush
}
obj, closer, err := si.calculator.Flush()
if err != nil {
return "", fmt.Errorf("failed to flush builder: %w", err)
return "", fmt.Errorf("failed to flush calculator: %w", err)
}
defer closer.Close()
@ -415,9 +491,11 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
return "", fmt.Errorf("failed to update metastore ToC file: %w", err)
}
level.Debug(si.logger).Log("msg", "finished building new index file", "partition", partition,
"events", len(events), "size", obj.Size(), "duration", time.Since(start),
"tenants", len(tenantTimeRanges), "path", key)
si.calculator.Reset()
si.skipMode = false // Reset skip mode
level.Debug(si.logger).Log("msg", "flushed index object", "partition", partition,
"path", key, "size", obj.Size(), "tenants", len(tenantTimeRanges))
return key, nil
}

@ -292,18 +292,27 @@ func TestSerialIndexer_ConcurrentBuilds(t *testing.T) {
// mockCalculator is a calculator that does nothing for use in tests
type mockCalculator struct {
count int
object *dataobj.Object
count int
object *dataobj.Object
flushCallCount int
resetCallCount int
errOnCallNumber int // Which Calculate call should set full flag (0 = never)
full bool // Track when builder becomes full
}
func (c *mockCalculator) Calculate(_ context.Context, _ log.Logger, object *dataobj.Object, _ string) error {
c.count++
c.object = object
if c.errOnCallNumber > 0 && c.count == c.errOnCallNumber {
c.full = true
}
return nil
}
func (c *mockCalculator) Flush() (*dataobj.Object, io.Closer, error) {
return c.object, io.NopCloser(bytes.NewReader([]byte{})), nil
c.flushCallCount++
return c.object, io.NopCloser(bytes.NewReader([]byte("test-data"))), nil
}
func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange {
@ -316,4 +325,89 @@ func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange {
}
}
func (c *mockCalculator) Reset() {}
func (c *mockCalculator) Reset() {
c.resetCallCount++
c.full = false
}
func (c *mockCalculator) IsFull() bool {
return c.full
}
func TestSerialIndexer_FlushOnBuilderFull(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
// Set up test data - 3 events to process
bucket := objstore.NewInMemBucket()
for i := 0; i < 3; i++ {
buildLogObject(t, fmt.Sprintf("tenant-%d", i), fmt.Sprintf("test-path-%d", i), bucket)
}
events := []bufferedEvent{}
for i := 0; i < 3; i++ {
event := metastore.ObjectWrittenEvent{
ObjectPath: fmt.Sprintf("test-path-%d", i),
WriteTime: time.Now().Format(time.RFC3339),
}
record := &kgo.Record{Partition: int32(0)}
eventBytes, err := event.Marshal()
require.NoError(t, err)
record.Value = eventBytes
events = append(events, bufferedEvent{
event: event,
record: record,
})
}
// Create mock calculator that sets full flag on second call
mockCalc := &mockCalculator{
errOnCallNumber: 2, // Set full flag on the 2nd Calculate call
}
indexStorageBucket := objstore.NewInMemBucket()
// Create dedicated registry for this test
reg := prometheus.NewRegistry()
builderMetrics := newBuilderMetrics()
require.NoError(t, builderMetrics.register(reg))
indexerMetrics := newIndexerMetrics()
require.NoError(t, indexerMetrics.register(reg))
indexer := newSerialIndexer(
mockCalc,
bucket,
indexStorageBucket,
builderMetrics,
indexerMetrics,
log.NewLogfmtLogger(os.Stderr),
indexerConfig{QueueSize: 10},
)
// Start indexer service
require.NoError(t, indexer.StartAsync(ctx))
require.NoError(t, indexer.AwaitRunning(ctx))
defer func() {
indexer.StopAsync()
require.NoError(t, indexer.AwaitTerminated(context.Background()))
}()
// Submit build request with multiple events
records, err := indexer.submitBuild(ctx, events, 0, triggerTypeAppend)
require.NoError(t, err)
require.Len(t, records, 2) // All records should be returned
// Verify calculator behavior
require.Equal(t, 2, mockCalc.count) // 2 calls (no retries)
require.Equal(t, 1, mockCalc.flushCallCount) // 1 flush after full only
require.Equal(t, 1, mockCalc.resetCallCount) // 1 reset after full
require.False(t, indexer.skipMode)
// Verify metrics - single request/build despite multiple flushes
require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests))
require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds))
}

@ -124,6 +124,7 @@ type Builder struct {
labelCache *lru.Cache[string, labels.Labels]
currentSizeEstimate int
builderFull bool
builder *dataobj.Builder // Inner builder for accumulating sections.
streams map[string]*streams.Builder // The key is the TenantID.
@ -177,12 +178,16 @@ func (b *Builder) GetEstimatedSize() int {
return b.currentSizeEstimate
}
func (b *Builder) IsFull() bool {
return b.builderFull
}
func (b *Builder) AppendIndexPointer(tenantID string, path string, startTs time.Time, endTs time.Time) error {
b.metrics.appendsTotal.Inc()
newEntrySize := len(path) + 1 + 1 // path, startTs, endTs
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return ErrBuilderFull
b.builderFull = true
}
timer := prometheus.NewTimer(b.metrics.appendTime)
@ -216,7 +221,7 @@ func (b *Builder) AppendStream(tenantID string, stream streams.Stream) (int64, e
newEntrySize := labelsEstimate(stream.Labels) + 2
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return 0, ErrBuilderFull
b.builderFull = true
}
timer := prometheus.NewTimer(b.metrics.appendTime)
@ -273,7 +278,7 @@ func (b *Builder) ObserveLogLine(tenantID string, path string, section int64, st
newEntrySize := 4 // ints and times compress well so we just need to make an estimate.
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return ErrBuilderFull
b.builderFull = true
}
timer := prometheus.NewTimer(b.metrics.appendTime)
@ -309,7 +314,7 @@ func (b *Builder) AppendColumnIndex(tenantID string, path string, section int64,
newEntrySize := len(columnName) + 1 + 1 + len(valuesBloom) + 1
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return ErrBuilderFull
b.builderFull = true
}
timer := prometheus.NewTimer(b.metrics.appendTime)
@ -460,6 +465,7 @@ func (b *Builder) Reset() {
b.metrics.sizeEstimate.Set(0)
b.currentSizeEstimate = 0
b.builderFull = false
b.state = builderStateEmpty
}

@ -2,7 +2,6 @@ package indexobj
import (
"context"
"errors"
"fmt"
"testing"
"time"
@ -146,7 +145,7 @@ func TestBuilder_Append(t *testing.T) {
MinTimestamp: time.Unix(10, 0).UTC(),
MaxTimestamp: time.Unix(20, 0).UTC(),
})
if errors.Is(err, ErrBuilderFull) {
if builder.IsFull() {
break
}
require.NoError(t, err)
@ -166,7 +165,7 @@ func TestBuilder_AppendIndexPointer(t *testing.T) {
require.NoError(t, ctx.Err())
err := builder.AppendIndexPointer(testTenant, fmt.Sprintf("test/path-%d", i), time.Unix(10, 0).Add(time.Duration(i)*time.Second).UTC(), time.Unix(20, 0).Add(time.Duration(i)*time.Second).UTC())
if errors.Is(err, ErrBuilderFull) {
if builder.IsFull() {
break
}
require.NoError(t, err)

Loading…
Cancel
Save