diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go index ec69b1ef1c..b81bca354c 100644 --- a/pkg/dataobj/index/builder.go +++ b/pkg/dataobj/index/builder.go @@ -72,6 +72,7 @@ type calculator interface { Flush() (*dataobj.Object, io.Closer, error) TimeRanges() []multitenancy.TimeRange Reset() + IsFull() bool } // An interface for the methods needed from a kafka client. Useful for testing. diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index f75936015a..21dddc40b6 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -161,6 +161,9 @@ func TestIndexBuilder(t *testing.T) { }) } + _, err = p.indexer.(*serialIndexer).flushIndex(ctx, 0) + require.NoError(t, err) + indexes := readAllSectionPointers(t, bucket) require.Equal(t, 30, len(indexes)) } diff --git a/pkg/dataobj/index/calculate.go b/pkg/dataobj/index/calculate.go index 89809e341e..8740c832d4 100644 --- a/pkg/dataobj/index/calculate.go +++ b/pkg/dataobj/index/calculate.go @@ -45,6 +45,10 @@ func (c *Calculator) Flush() (*dataobj.Object, io.Closer, error) { return c.indexobjBuilder.Flush() } +func (c *Calculator) IsFull() bool { + return c.indexobjBuilder.IsFull() +} + // Calculate reads the log data from the input logs object and appends the resulting indexes to calculator's builder. // Calculate is not thread-safe. func (c *Calculator) Calculate(ctx context.Context, logger log.Logger, reader *dataobj.Object, objectPath string) error { diff --git a/pkg/dataobj/index/indexer.go b/pkg/dataobj/index/indexer.go index 768df4e38e..e4338fab85 100644 --- a/pkg/dataobj/index/indexer.go +++ b/pkg/dataobj/index/indexer.go @@ -32,7 +32,7 @@ type buildRequest struct { // buildResult represents the result of an index build operation type buildResult struct { - indexPath string + indexPath string // Index path when ErrBuilderFull causes multiple flushes records []*kgo.Record // Records to commit after successful build err error } @@ -65,6 +65,8 @@ type serialIndexer struct { // Download pipeline downloadQueue chan metastore.ObjectWrittenEvent downloadedObjects chan downloadedObject + skipDownloads chan struct{} // Signal to skip downloading new objects + skipMode bool // Track whether we're in skip mode // Worker management buildRequestChan chan buildRequest @@ -96,6 +98,7 @@ func newSerialIndexer( buildRequestChan: make(chan buildRequest, cfg.QueueSize), downloadQueue: make(chan metastore.ObjectWrittenEvent, 32), downloadedObjects: make(chan downloadedObject, 1), + skipDownloads: make(chan struct{}), } // Initialize dskit Service @@ -135,6 +138,9 @@ func (si *serialIndexer) stopping(_ error) error { close(si.downloadQueue) close(si.buildRequestChan) + // Close the skip downloads channel to signal any waiting download workers + close(si.skipDownloads) + // Wait for workers to finish si.downloadWorkerWg.Wait() si.buildWorkerWg.Wait() @@ -196,6 +202,8 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) { level.Debug(si.logger).Log("msg", "download worker started") defer level.Debug(si.logger).Log("msg", "download worker stopped") + si.skipMode = false // Track whether we're in skip mode + for { select { case event, ok := <-si.downloadQueue: @@ -204,6 +212,12 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) { return } + if si.skipMode { + // Skip downloading, just drain the queue + level.Debug(si.logger).Log("msg", "skipping download due to skip signal", "object_path", event.ObjectPath) + continue + } + objLogger := log.With(si.logger, "object_path", event.ObjectPath) downloadStart := time.Now() @@ -247,6 +261,11 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) { return } + case <-si.skipDownloads: + level.Debug(si.logger).Log("msg", "download worker received skip signal, entering skip mode") + si.skipMode = true + // Continue in the loop to drain the downloadQueue + case <-ctx.Done(): return } @@ -302,7 +321,7 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult { } // Build the index using internal method - indexPath, err := si.buildIndex(req.ctx, events, req.partition) + indexPath, processed, err := si.buildIndex(req.ctx, events, req.partition) // Update metrics buildTime := time.Since(start) @@ -310,18 +329,18 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult { if err != nil { level.Error(si.logger).Log("msg", "failed to build index", - "partition", req.partition, "err", err, "duration", buildTime) + "partition", req.partition, "err", err, "duration", buildTime, "processed", processed) return buildResult{err: err} } level.Debug(si.logger).Log("msg", "successfully built index", "partition", req.partition, "index_path", indexPath, "duration", buildTime, - "events", len(events)) + "events", len(events), "processed", processed) - // Extract records for committing - records := make([]*kgo.Record, len(req.events)) - for i, buffered := range req.events { - records[i] = buffered.record + // Extract records for committing - only for processed events + records := make([]*kgo.Record, processed) + for i := range processed { + records[i] = req.events[i].record } return buildResult{ @@ -331,8 +350,11 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult { } } -// buildIndex is the core index building logic (moved from builder) -func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent, partition int32) (string, error) { +// buildIndex is writing all metastore events to a single index object. It +// returns the index path and the number of events processed or an error if the index object is not created. +// The number of events processed can be less than the number of events if the builder becomes full +// when the trigger is triggerTypeAppend. +func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.ObjectWrittenEvent, partition int32) (string, int, error) { level.Debug(si.logger).Log("msg", "building index", "events", len(events), "partition", partition) start := time.Now() @@ -340,30 +362,32 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime) if err != nil { level.Error(si.logger).Log("msg", "failed to parse write time", "err", err) - return "", err + return "", 0, err } si.builderMetrics.setProcessingDelay(writeTime) - // Trigger the downloads for _, event := range events { select { case si.downloadQueue <- event: // Successfully sent event for download case <-ctx.Done(): - return "", ctx.Err() + return "", 0, ctx.Err() } } - // Process the results as they are downloaded + // Process downloaded objects, handling ErrBuilderFull processingErrors := multierror.New() - for range len(events) { + + processed := 0 + for processed < len(events) { var obj downloadedObject select { case obj = <-si.downloadedObjects: case <-ctx.Done(): - return "", ctx.Err() + return "", processed, ctx.Err() } + processed++ objLogger := log.With(si.logger, "object_path", obj.event.ObjectPath) level.Debug(objLogger).Log("msg", "processing object") @@ -372,26 +396,78 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje continue } - reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes))) - if err != nil { - processingErrors.Add(fmt.Errorf("failed to read object: %w", err)) + // Process this object + if err := si.processObject(ctx, objLogger, obj); err != nil { + processingErrors.Add(fmt.Errorf("failed to process object: %w", err)) continue } - if err := si.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath); err != nil { - processingErrors.Add(fmt.Errorf("failed to calculate index: %w", err)) - continue + // Check if builder became full during processing + if si.calculator.IsFull() { + // Signal download worker to skip downloading new objects + select { + case si.skipDownloads <- struct{}{}: + level.Debug(si.logger).Log("msg", "sent skip signal to download worker") + default: + // Channel might be full or already closed, continue anyway + level.Debug(si.logger).Log("msg", "failed to send skip signal to download worker, channel full or closed") + } + + // Drain any remaining downloaded objects to prevent goroutine leaks + drainLoop: + for { + select { + case <-si.downloadedObjects: + // Drain the channel, continue draining + case <-ctx.Done(): + break drainLoop + default: + break drainLoop // No more objects to drain + } + } + break } } if processingErrors.Err() != nil { - return "", processingErrors.Err() + return "", processed, processingErrors.Err() + } + + // Flush current index and start fresh for each trigger type means: + // - append: Either the calculator became full and the remaining events will be processed by the next trigger + // or all events have been processed, so we flush the current index and start fresh + // - max-idle: All events have been processed, so we flush the current index and start fresh + indexPath, flushErr := si.flushIndex(ctx, partition) + if flushErr != nil { + return "", processed, fmt.Errorf("failed to flush index after processing full object: %w", flushErr) } + level.Debug(si.logger).Log("msg", "finished building index files", "partition", partition, + "events", len(events), "processed", processed, "index_path", indexPath, "duration", time.Since(start)) + + return indexPath, processed, nil +} + +// processObject handles processing a single downloaded object +func (si *serialIndexer) processObject(ctx context.Context, objLogger log.Logger, obj downloadedObject) error { + reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes))) + if err != nil { + return fmt.Errorf("failed to read object: %w", err) + } + + return si.calculator.Calculate(ctx, objLogger, reader, obj.event.ObjectPath) +} + +// flushIndex flushes the current calculator state to an index object +func (si *serialIndexer) flushIndex(ctx context.Context, partition int32) (string, error) { tenantTimeRanges := si.calculator.TimeRanges() + if len(tenantTimeRanges) == 0 { + return "", nil // Nothing to flush + } + obj, closer, err := si.calculator.Flush() if err != nil { - return "", fmt.Errorf("failed to flush builder: %w", err) + return "", fmt.Errorf("failed to flush calculator: %w", err) } defer closer.Close() @@ -415,9 +491,11 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje return "", fmt.Errorf("failed to update metastore ToC file: %w", err) } - level.Debug(si.logger).Log("msg", "finished building new index file", "partition", partition, - "events", len(events), "size", obj.Size(), "duration", time.Since(start), - "tenants", len(tenantTimeRanges), "path", key) + si.calculator.Reset() + si.skipMode = false // Reset skip mode + + level.Debug(si.logger).Log("msg", "flushed index object", "partition", partition, + "path", key, "size", obj.Size(), "tenants", len(tenantTimeRanges)) return key, nil } diff --git a/pkg/dataobj/index/indexer_test.go b/pkg/dataobj/index/indexer_test.go index a4281ec160..bcfe0dedd7 100644 --- a/pkg/dataobj/index/indexer_test.go +++ b/pkg/dataobj/index/indexer_test.go @@ -292,18 +292,27 @@ func TestSerialIndexer_ConcurrentBuilds(t *testing.T) { // mockCalculator is a calculator that does nothing for use in tests type mockCalculator struct { - count int - object *dataobj.Object + count int + object *dataobj.Object + flushCallCount int + resetCallCount int + errOnCallNumber int // Which Calculate call should set full flag (0 = never) + full bool // Track when builder becomes full } func (c *mockCalculator) Calculate(_ context.Context, _ log.Logger, object *dataobj.Object, _ string) error { c.count++ c.object = object + + if c.errOnCallNumber > 0 && c.count == c.errOnCallNumber { + c.full = true + } return nil } func (c *mockCalculator) Flush() (*dataobj.Object, io.Closer, error) { - return c.object, io.NopCloser(bytes.NewReader([]byte{})), nil + c.flushCallCount++ + return c.object, io.NopCloser(bytes.NewReader([]byte("test-data"))), nil } func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange { @@ -316,4 +325,89 @@ func (c *mockCalculator) TimeRanges() []multitenancy.TimeRange { } } -func (c *mockCalculator) Reset() {} +func (c *mockCalculator) Reset() { + c.resetCallCount++ + c.full = false +} + +func (c *mockCalculator) IsFull() bool { + return c.full +} + +func TestSerialIndexer_FlushOnBuilderFull(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Set up test data - 3 events to process + bucket := objstore.NewInMemBucket() + for i := 0; i < 3; i++ { + buildLogObject(t, fmt.Sprintf("tenant-%d", i), fmt.Sprintf("test-path-%d", i), bucket) + } + + events := []bufferedEvent{} + for i := 0; i < 3; i++ { + event := metastore.ObjectWrittenEvent{ + ObjectPath: fmt.Sprintf("test-path-%d", i), + WriteTime: time.Now().Format(time.RFC3339), + } + + record := &kgo.Record{Partition: int32(0)} + eventBytes, err := event.Marshal() + require.NoError(t, err) + record.Value = eventBytes + + events = append(events, bufferedEvent{ + event: event, + record: record, + }) + } + + // Create mock calculator that sets full flag on second call + mockCalc := &mockCalculator{ + errOnCallNumber: 2, // Set full flag on the 2nd Calculate call + } + indexStorageBucket := objstore.NewInMemBucket() + + // Create dedicated registry for this test + reg := prometheus.NewRegistry() + + builderMetrics := newBuilderMetrics() + require.NoError(t, builderMetrics.register(reg)) + + indexerMetrics := newIndexerMetrics() + require.NoError(t, indexerMetrics.register(reg)) + + indexer := newSerialIndexer( + mockCalc, + bucket, + indexStorageBucket, + builderMetrics, + indexerMetrics, + log.NewLogfmtLogger(os.Stderr), + indexerConfig{QueueSize: 10}, + ) + + // Start indexer service + require.NoError(t, indexer.StartAsync(ctx)) + require.NoError(t, indexer.AwaitRunning(ctx)) + defer func() { + indexer.StopAsync() + require.NoError(t, indexer.AwaitTerminated(context.Background())) + }() + + // Submit build request with multiple events + records, err := indexer.submitBuild(ctx, events, 0, triggerTypeAppend) + require.NoError(t, err) + require.Len(t, records, 2) // All records should be returned + + // Verify calculator behavior + require.Equal(t, 2, mockCalc.count) // 2 calls (no retries) + require.Equal(t, 1, mockCalc.flushCallCount) // 1 flush after full only + require.Equal(t, 1, mockCalc.resetCallCount) // 1 reset after full + require.False(t, indexer.skipMode) + + // Verify metrics - single request/build despite multiple flushes + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests)) + require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds)) +} diff --git a/pkg/dataobj/index/indexobj/builder.go b/pkg/dataobj/index/indexobj/builder.go index 87579164c2..09daefbb6c 100644 --- a/pkg/dataobj/index/indexobj/builder.go +++ b/pkg/dataobj/index/indexobj/builder.go @@ -124,6 +124,7 @@ type Builder struct { labelCache *lru.Cache[string, labels.Labels] currentSizeEstimate int + builderFull bool builder *dataobj.Builder // Inner builder for accumulating sections. streams map[string]*streams.Builder // The key is the TenantID. @@ -177,12 +178,16 @@ func (b *Builder) GetEstimatedSize() int { return b.currentSizeEstimate } +func (b *Builder) IsFull() bool { + return b.builderFull +} + func (b *Builder) AppendIndexPointer(tenantID string, path string, startTs time.Time, endTs time.Time) error { b.metrics.appendsTotal.Inc() newEntrySize := len(path) + 1 + 1 // path, startTs, endTs if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { - return ErrBuilderFull + b.builderFull = true } timer := prometheus.NewTimer(b.metrics.appendTime) @@ -216,7 +221,7 @@ func (b *Builder) AppendStream(tenantID string, stream streams.Stream) (int64, e newEntrySize := labelsEstimate(stream.Labels) + 2 if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { - return 0, ErrBuilderFull + b.builderFull = true } timer := prometheus.NewTimer(b.metrics.appendTime) @@ -273,7 +278,7 @@ func (b *Builder) ObserveLogLine(tenantID string, path string, section int64, st newEntrySize := 4 // ints and times compress well so we just need to make an estimate. if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { - return ErrBuilderFull + b.builderFull = true } timer := prometheus.NewTimer(b.metrics.appendTime) @@ -309,7 +314,7 @@ func (b *Builder) AppendColumnIndex(tenantID string, path string, section int64, newEntrySize := len(columnName) + 1 + 1 + len(valuesBloom) + 1 if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { - return ErrBuilderFull + b.builderFull = true } timer := prometheus.NewTimer(b.metrics.appendTime) @@ -460,6 +465,7 @@ func (b *Builder) Reset() { b.metrics.sizeEstimate.Set(0) b.currentSizeEstimate = 0 + b.builderFull = false b.state = builderStateEmpty } diff --git a/pkg/dataobj/index/indexobj/builder_test.go b/pkg/dataobj/index/indexobj/builder_test.go index 77c698fd08..8271171e2d 100644 --- a/pkg/dataobj/index/indexobj/builder_test.go +++ b/pkg/dataobj/index/indexobj/builder_test.go @@ -2,7 +2,6 @@ package indexobj import ( "context" - "errors" "fmt" "testing" "time" @@ -146,7 +145,7 @@ func TestBuilder_Append(t *testing.T) { MinTimestamp: time.Unix(10, 0).UTC(), MaxTimestamp: time.Unix(20, 0).UTC(), }) - if errors.Is(err, ErrBuilderFull) { + if builder.IsFull() { break } require.NoError(t, err) @@ -166,7 +165,7 @@ func TestBuilder_AppendIndexPointer(t *testing.T) { require.NoError(t, ctx.Err()) err := builder.AppendIndexPointer(testTenant, fmt.Sprintf("test/path-%d", i), time.Unix(10, 0).Add(time.Duration(i)*time.Second).UTC(), time.Unix(20, 0).Add(time.Duration(i)*time.Second).UTC()) - if errors.Is(err, ErrBuilderFull) { + if builder.IsFull() { break } require.NoError(t, err)