diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d05da61963..fe362fb814 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1056,6 +1056,42 @@ dataobj: # CLI flag: -dataobj-consumer.idle-flush-timeout [idle_flush_timeout: | default = 1h] + index: + # The size of the target page to use for the data object builder. + # CLI flag: -dataobj-index-builder.target-page-size + [target_page_size: | default = 128KiB] + + # The size of the target object to use for the data object builder. + # CLI flag: -dataobj-index-builder.target-object-size + [target_object_size: | default = 64MiB] + + # Configures a maximum size for sections, for sections that support it. + # CLI flag: -dataobj-index-builder.target-section-size + [target_section_size: | default = 16MiB] + + # The size of the buffer to use for sorting logs. + # CLI flag: -dataobj-index-builder.buffer-size + [buffer_size: | default = 2MiB] + + # The maximum number of stripes to merge into a section at once. Must be + # greater than 1. + # CLI flag: -dataobj-index-builder.section-stripe-merge-limit + [section_stripe_merge_limit: | default = 2] + + # Experimental: The number of events to batch before building an index + # CLI flag: -dataobj-index-builder.events-per-index + [events_per_index: | default = 32] + + # Experimental: A prefix to use for storing indexes in object storage. Used + # to separate the metastore & index files during initial testing. + # CLI flag: -dataobj-index-builder.storage-prefix + [index_storage_prefix: | default = "index/v0/"] + + # Experimental: A list of tenant IDs to enable index building for. If empty, + # all tenants will be enabled. + # CLI flag: -dataobj-index-builder.enabled-tenant-ids + [enabled_tenant_ids: | default = ""] + querier: # Enable the dataobj querier. # CLI flag: -dataobj-querier-enabled diff --git a/pkg/dataobj/config/config.go b/pkg/dataobj/config/config.go index ea28e57139..956cd389c2 100644 --- a/pkg/dataobj/config/config.go +++ b/pkg/dataobj/config/config.go @@ -4,11 +4,13 @@ import ( "flag" "github.com/grafana/loki/v3/pkg/dataobj/consumer" + "github.com/grafana/loki/v3/pkg/dataobj/index" "github.com/grafana/loki/v3/pkg/dataobj/querier" ) type Config struct { Consumer consumer.Config `yaml:"consumer"` + Index index.Config `yaml:"index"` Querier querier.Config `yaml:"querier"` // StorageBucketPrefix is the prefix to use for the storage bucket. StorageBucketPrefix string `yaml:"storage_bucket_prefix"` @@ -16,6 +18,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Consumer.RegisterFlags(f) + cfg.Index.RegisterFlags(f) cfg.Querier.RegisterFlags(f) f.StringVar(&cfg.StorageBucketPrefix, "dataobj-storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.") } diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 3201ee46c4..48d414f433 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -34,6 +34,7 @@ type Service struct { client *consumer.Client eventsProducerClient *kgo.Client + eventConsumerClient *kgo.Client cfg Config bucket objstore.Bucket @@ -82,14 +83,15 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore. eventsKafkaCfg := kafkaCfg eventsKafkaCfg.Topic = "loki.metastore-events" + eventsKafkaCfg.AutoCreateTopicDefaultPartitions = 1 eventsProducerClient, err := client.NewWriterClient("loki.metastore-events", eventsKafkaCfg, 50, logger, reg) if err != nil { level.Error(logger).Log("msg", "failed to create producer", "err", err) return nil } - s.client = consumerClient s.eventsProducerClient = eventsProducerClient + s.Service = services.NewBasicService(nil, s.run, s.stopping) return s } diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go new file mode 100644 index 0000000000..1e7713cccf --- /dev/null +++ b/pkg/dataobj/index/builder.go @@ -0,0 +1,511 @@ +package index + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "flag" + "fmt" + "io" + "runtime" + "slices" + "sync" + "time" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" +) + +type Config struct { + indexobj.BuilderConfig `yaml:",inline"` + EventsPerIndex int `yaml:"events_per_index" experimental:"true"` + IndexStoragePrefix string `yaml:"index_storage_prefix" experimental:"true"` + EnabledTenantIDs flagext.StringSliceCSV `yaml:"enabled_tenant_ids" experimental:"true"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f) +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") + f.StringVar(&cfg.IndexStoragePrefix, prefix+"storage-prefix", "index/v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.") + f.Var(&cfg.EnabledTenantIDs, prefix+"enabled-tenant-ids", "Experimental: A list of tenant IDs to enable index building for. If empty, all tenants will be enabled.") +} + +type downloadedObject struct { + event metastore.ObjectWrittenEvent + objectBytes *[]byte + err error +} + +const ( + indexEventTopic = "loki.metastore-events" + indexConsumerGroup = "metastore-event-reader" +) + +type Builder struct { + services.Service + + cfg Config + + // Kafka client and topic/partition info + client *kgo.Client + topic string + + // Processing pipeline + downloadQueue chan metastore.ObjectWrittenEvent + downloadedObjects chan downloadedObject + builder *indexobj.Builder + + bufferedEvents map[string][]metastore.ObjectWrittenEvent + + // Builder initialization + builderCfg indexobj.BuilderConfig + bucket objstore.Bucket + flushBuffer *bytes.Buffer + + // Metrics + metrics *indexBuilderMetrics + + // Control and coordination + ctx context.Context + cancel context.CancelCauseFunc + wg sync.WaitGroup + logger log.Logger + builderMtx sync.Mutex +} + +func NewIndexBuilder( + cfg Config, + kafkaCfg kafka.Config, + logger log.Logger, + instanceID string, + bucket objstore.Bucket, + reg prometheus.Registerer, +) (*Builder, error) { + kafkaCfg.AutoCreateTopicEnabled = true + kafkaCfg.AutoCreateTopicDefaultPartitions = 64 + eventConsumerClient, err := client.NewReaderClient( + "index_builder", + kafkaCfg, + logger, + reg, + kgo.ConsumeTopics(indexEventTopic), + kgo.InstanceID(instanceID), + kgo.SessionTimeout(3*time.Minute), + kgo.ConsumerGroup(indexConsumerGroup), + kgo.RebalanceTimeout(5*time.Minute), + kgo.DisableAutoCommit(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create kafka consumer client: %w", err) + } + + reg = prometheus.WrapRegistererWith(prometheus.Labels{ + "topic": indexEventTopic, + "component": "index_builder", + }, reg) + + metrics := newIndexBuilderMetrics() + if err := metrics.register(reg); err != nil { + return nil, fmt.Errorf("failed to register metrics for index builder: %w", err) + } + + builder, err := indexobj.NewBuilder(cfg.BuilderConfig) + if err != nil { + return nil, fmt.Errorf("failed to create index builder: %w", err) + } + + if err := builder.RegisterMetrics(reg); err != nil { + return nil, fmt.Errorf("failed to register metrics for index builder: %w", err) + } + + // Allocate a single buffer + flushBuffer := bytes.NewBuffer(make([]byte, int(float64(cfg.BuilderConfig.TargetObjectSize)*1.2))) + + // Set up queues to download the next object (I/O bound) while processing the current one (CPU bound) in order to maximize throughput. + // Setting the channel buffer sizes caps the total memory usage by only keeping up to 3 objects in memory at a time: One being processed, one fully downloaded and one being downloaded from the queue. + downloadQueue := make(chan metastore.ObjectWrittenEvent, cfg.EventsPerIndex) + downloadedObjects := make(chan downloadedObject, 1) + + s := &Builder{ + cfg: cfg, + client: eventConsumerClient, + logger: logger, + builder: builder, + bucket: bucket, + flushBuffer: flushBuffer, + builderMtx: sync.Mutex{}, + downloadedObjects: downloadedObjects, + downloadQueue: downloadQueue, + metrics: metrics, + + bufferedEvents: make(map[string][]metastore.ObjectWrittenEvent), + } + + s.Service = services.NewBasicService(nil, s.run, s.stopping) + + return s, nil +} + +func (p *Builder) run(ctx context.Context) error { + p.ctx, p.cancel = context.WithCancelCause(ctx) + + p.wg.Add(1) + go func() { + // Download worker + defer p.wg.Done() + for event := range p.downloadQueue { + objLogger := log.With(p.logger, "object_path", event.ObjectPath) + downloadStart := time.Now() + + objectReader, err := p.bucket.Get(p.ctx, event.ObjectPath) + if err != nil { + p.downloadedObjects <- downloadedObject{ + event: event, + err: fmt.Errorf("failed to fetch object from storage: %w", err), + } + continue + } + + object, err := io.ReadAll(objectReader) + _ = objectReader.Close() + if err != nil { + p.downloadedObjects <- downloadedObject{ + event: event, + err: fmt.Errorf("failed to read object: %w", err), + } + continue + } + level.Info(objLogger).Log("msg", "downloaded object", "duration", time.Since(downloadStart), "size_mb", float64(len(object))/1024/1024, "avg_speed_mbps", float64(len(object))/time.Since(downloadStart).Seconds()/1024/1024) + p.downloadedObjects <- downloadedObject{ + event: event, + objectBytes: &object, + } + } + }() + + level.Info(p.logger).Log("msg", "started index builder service") + for { + fetches := p.client.PollRecords(ctx, -1) + if fetches.IsClientClosed() || ctx.Err() != nil { + return ctx.Err() + } + if errs := fetches.Errors(); len(errs) > 0 { + level.Error(p.logger).Log("msg", "error fetching records", "err", errs) + continue + } + if fetches.Empty() { + continue + } + fetches.EachPartition(func(ftp kgo.FetchTopicPartition) { + // TODO(benclive): Verify if we need to return re-poll ASAP or if sequential processing is good enough. + for _, record := range ftp.Records { + p.processRecord(record) + } + }) + } +} + +func (p *Builder) stopping(failureCase error) error { + close(p.downloadQueue) + p.cancel(failureCase) + p.wg.Wait() + close(p.downloadedObjects) + p.client.Close() + return nil +} + +func (p *Builder) processRecord(record *kgo.Record) { + event := &metastore.ObjectWrittenEvent{} + if err := event.Unmarshal(record.Value); err != nil { + level.Error(p.logger).Log("msg", "failed to unmarshal metastore event", "err", err) + return + } + p.bufferedEvents[event.Tenant] = append(p.bufferedEvents[event.Tenant], *event) + level.Info(p.logger).Log("msg", "buffered new event for tenant", "count", len(p.bufferedEvents[event.Tenant]), "tenant", event.Tenant) + + if len(p.bufferedEvents[event.Tenant]) >= p.cfg.EventsPerIndex { + if !slices.Contains(p.cfg.EnabledTenantIDs, event.Tenant) { + // TODO(benclive): Remove this check once builders handle multi-tenancy when building indexes. + level.Info(p.logger).Log("msg", "skipping index build for disabled tenant", "tenant", event.Tenant) + p.bufferedEvents[event.Tenant] = p.bufferedEvents[event.Tenant][:0] + return + } + err := p.buildIndex(p.bufferedEvents[event.Tenant][:len(p.bufferedEvents[event.Tenant])]) + if err != nil { + // TODO(benclive): Improve error handling for failed index builds. + panic(err) + } + + if err := p.commitRecords(record); err != nil { + level.Warn(p.logger).Log("msg", "failed to commit records", "err", err) + return + } + p.bufferedEvents[event.Tenant] = p.bufferedEvents[event.Tenant][:0] + } +} + +func (p *Builder) buildIndex(events []metastore.ObjectWrittenEvent) error { + indexStorageBucket := objstore.NewPrefixedBucket(p.bucket, p.cfg.IndexStoragePrefix) + level.Info(p.logger).Log("msg", "building index", "events", len(events), "tenant", events[0].Tenant) + start := time.Now() + + // Observe processing delay + writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime) + if err != nil { + level.Error(p.logger).Log("msg", "failed to parse write time", "err", err) + return err + } + p.metrics.observeProcessingDelay(writeTime) + + // Trigger the downloads + for _, event := range events { + p.downloadQueue <- event + } + + // Process the results as they are downloaded + processingErrors := multierror.New() + for i := 0; i < len(events); i++ { + obj := <-p.downloadedObjects + objLogger := log.With(p.logger, "object_path", obj.event.ObjectPath) + level.Info(objLogger).Log("msg", "processing object") + + if obj.err != nil { + processingErrors.Add(fmt.Errorf("failed to download object: %w", obj.err)) + continue + } + + reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes))) + if err != nil { + processingErrors.Add(fmt.Errorf("failed to read object: %w", err)) + continue + } + + // Streams Section: process this section first to ensure all streams have been added to the builder and are given new IDs. + for i, section := range reader.Sections().Filter(streams.CheckSection) { + level.Debug(objLogger).Log("msg", "processing streams section", "index", i) + if err := p.processStreamsSection(section, obj.event.ObjectPath); err != nil { + processingErrors.Add(fmt.Errorf("failed to process stream section: %w", err)) + continue + } + } + + // Logs Section: these can be processed in parallel once we have the stream IDs. This work is heavily CPU bound so is limited to GOMAXPROCS parallelism. + g, ctx := errgroup.WithContext(p.ctx) + g.SetLimit(runtime.GOMAXPROCS(0)) + for i, section := range reader.Sections().Filter(logs.CheckSection) { + g.Go(func() error { + sectionLogger := log.With(objLogger, "section", i) + level.Debug(sectionLogger).Log("msg", "processing logs section") + // 1. A bloom filter for each column in the logs section. + // 2. A per-section stream time-range index using min/max of each stream in the logs section. StreamIDs will reference the aggregate stream section. + if err := p.processLogsSection(ctx, sectionLogger, obj.event.ObjectPath, section, int64(i)); err != nil { + return fmt.Errorf("failed to process logs section path=%s section=%d: %w", obj.event.ObjectPath, i, err) + } + return nil + }) + } + if err := g.Wait(); err != nil { + processingErrors.Add(fmt.Errorf("failed to process logs sections: %w", err)) + continue + } + } + + if processingErrors.Err() != nil { + return processingErrors.Err() + } + + p.flushBuffer.Reset() + stats, err := p.builder.Flush(p.flushBuffer) + if err != nil { + return fmt.Errorf("failed to flush builder: %w", err) + } + + size := p.flushBuffer.Len() + + key := p.getKey(events[0].Tenant, p.flushBuffer) + if err := indexStorageBucket.Upload(p.ctx, key, p.flushBuffer); err != nil { + return fmt.Errorf("failed to upload index: %w", err) + } + + metastoreUpdater := metastore.NewUpdater(indexStorageBucket, events[0].Tenant, p.logger) + if stats.MinTimestamp.IsZero() || stats.MaxTimestamp.IsZero() { + return errors.New("failed to get min/max timestamps") + } + if err := metastoreUpdater.Update(p.ctx, key, stats.MinTimestamp, stats.MaxTimestamp); err != nil { + return fmt.Errorf("failed to update metastore: %w", err) + } + + level.Info(p.logger).Log("msg", "finished building index", "tenant", events[0].Tenant, "events", len(events), "size", size, "duration", time.Since(start)) + return nil +} + +// getKey determines the key in object storage to upload the object to, based on our path scheme. +func (p *Builder) getKey(tenantID string, object *bytes.Buffer) string { + sum := sha256.Sum224(object.Bytes()) + sumStr := hex.EncodeToString(sum[:]) + + return fmt.Sprintf("tenant-%s/indexes/%s/%s", tenantID, sumStr[:2], sumStr[2:]) +} + +func (p *Builder) processStreamsSection(section *dataobj.Section, objectPath string) /*map[int64]streams.Stream, map[uint64]int64,*/ error { + streamSection, err := streams.Open(p.ctx, section) + if err != nil { + return fmt.Errorf("failed to open stream section: %w", err) + } + + streamBuf := make([]streams.Stream, 2048) + rowReader := streams.NewRowReader(streamSection) + for { + n, err := rowReader.Read(p.ctx, streamBuf) + if err != nil && err != io.EOF { + return fmt.Errorf("failed to read stream section: %w", err) + } + if n == 0 && err == io.EOF { + break + } + for _, stream := range streamBuf[:n] { + newStreamID, err := p.builder.AppendStream(stream) + if err != nil { + return fmt.Errorf("failed to append to stream: %w", err) + } + p.builder.RecordStreamRef(objectPath, stream.ID, newStreamID) + } + } + return nil +} + +// processLogsSection reads information from the logs section in order to build index information in a new object. +func (p *Builder) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64) error { + logsBuf := make([]logs.Record, 1024) + type logInfo struct { + objectPath string + sectionIdx int64 + streamID int64 + timestamp time.Time + length int64 + } + logsInfo := make([]logInfo, len(logsBuf)) + + logsSection, err := logs.Open(ctx, section) + if err != nil { + return fmt.Errorf("failed to open logs section: %w", err) + } + + // Fetch the column statistics in order to init the bloom filters for each column + stats, err := logs.ReadStats(ctx, logsSection) + if err != nil { + return fmt.Errorf("failed to read log section stats: %w", err) + } + + columnBloomBuilders := make(map[string]*bloom.BloomFilter) + columnIndexes := make(map[string]int64) + for _, column := range stats.Columns { + if !logs.IsMetadataColumn(column.Type) { + continue + } + columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0) + columnIndexes[column.Name] = column.ColumnIndex + } + + // Read the whole logs section to extract all the column values. + cnt := 0 + // TODO(benclive): Switch to a columnar reader instead of row based + // This is also likely to be more performant, especially if we don't need to read the whole log line. + // Note: the source object would need a new column storing just the length to avoid reading the log line itself. + rowReader := logs.NewRowReader(logsSection) + for { + n, err := rowReader.Read(p.ctx, logsBuf) + if err != nil && err != io.EOF { + return fmt.Errorf("failed to read logs section: %w", err) + } + if n == 0 && err == io.EOF { + break + } + + for i, log := range logsBuf[:n] { + cnt++ + for _, md := range log.Metadata { + columnBloomBuilders[md.Name].Add([]byte(md.Value)) + } + logsInfo[i].objectPath = objectPath + logsInfo[i].sectionIdx = sectionIdx + logsInfo[i].streamID = log.StreamID + logsInfo[i].timestamp = log.Timestamp + logsInfo[i].length = int64(len(log.Line)) + } + + // Lock the mutex once per read for perf reasons. + p.builderMtx.Lock() + for _, log := range logsInfo[:n] { + err = p.builder.ObserveLogLine(log.objectPath, log.sectionIdx, log.streamID, log.timestamp, log.length) + if err != nil { + p.builderMtx.Unlock() + return fmt.Errorf("failed to observe log line: %w", err) + } + } + p.builderMtx.Unlock() + } + + // Write the indexes (bloom filters) to the new index object. + for columnName, bloom := range columnBloomBuilders { + bloomBytes, err := bloom.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal bloom filter: %w", err) + } + p.builderMtx.Lock() + err = p.builder.AppendColumnIndex(objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes) + p.builderMtx.Unlock() + if err != nil { + return fmt.Errorf("failed to append column index: %w", err) + } + } + + level.Info(sectionLogger).Log("msg", "finished processing logs section", "rowsProcessed", cnt) + return nil +} + +func (p *Builder) commitRecords(record *kgo.Record) error { + backoff := backoff.New(p.ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 20, + }) + + var lastErr error + backoff.Reset() + for backoff.Ongoing() { + p.metrics.incCommitsTotal() + err := p.client.CommitRecords(p.ctx, record) + if err == nil { + return nil + } + level.Error(p.logger).Log("msg", "failed to commit records", "err", err) + p.metrics.incCommitFailures() + lastErr = err + backoff.Wait() + } + return lastErr +} diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go new file mode 100644 index 0000000000..688e81a113 --- /dev/null +++ b/pkg/dataobj/index/builder_test.go @@ -0,0 +1,184 @@ +package index + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/testkafka" + "github.com/grafana/loki/v3/pkg/logproto" +) + +var testBuilderConfig = indexobj.BuilderConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + + BufferSize: 4 * 1024 * 1024, + + SectionStripeMergeLimit: 2, +} + +func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) { + candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + + BufferSize: 4 * 1024 * 1024, + SectionStripeMergeLimit: 2, + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + stream := logproto.Stream{ + Labels: fmt.Sprintf("{app=\"%s\",stream=\"%d\"}", app, i), + Entries: []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, + } + err = candidate.Append(stream) + require.NoError(t, err) + } + + buf := bytes.NewBuffer(nil) + _, err = candidate.Flush(buf) + require.NoError(t, err) + + err = bucket.Upload(context.Background(), path, buf) + require.NoError(t, err) +} + +func TestIndexBuilder(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // Setup test dependencies + bucket := objstore.NewInMemBucket() + + cluster, configString := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "loki.metastore-events") + defer cluster.Close() + + client, err := kgo.NewClient(kgo.ConsumerGroup("test-consumer-group"), kgo.ConsumeTopics("loki.metastore-events"), kgo.SeedBrokers(configString)) + require.NoError(t, err) + + indexPrefix := "test-prefix" + tenant := "test-tenant" + p, err := NewIndexBuilder( + Config{ + BuilderConfig: indexobj.BuilderConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + + BufferSize: 4 * 1024 * 1024, + SectionStripeMergeLimit: 2, + }, + EventsPerIndex: 3, + IndexStoragePrefix: indexPrefix, + EnabledTenantIDs: []string{tenant}, + }, + kafka.Config{}, + log.NewNopLogger(), + "instance-id", + bucket, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + p.client = client + require.NoError(t, p.StartAsync(ctx)) + + buildLogObject(t, "loki", "test-path-0", bucket) + buildLogObject(t, "testing", "test-path-1", bucket) + buildLogObject(t, "three", "test-path-2", bucket) + + for i := 0; i < 3; i++ { + event := metastore.ObjectWrittenEvent{ + ObjectPath: fmt.Sprintf("test-path-%d", i), + Tenant: tenant, + WriteTime: time.Now().Format(time.RFC3339), + } + eventBytes, err := event.Marshal() + require.NoError(t, err) + + p.processRecord(&kgo.Record{ + Key: []byte(tenant), + Value: eventBytes, + }) + } + + indexes := readAllSectionPointers(t, bucket, indexPrefix) + require.Equal(t, 30, len(indexes)) +} + +func readAllSectionPointers(t *testing.T, bucket objstore.Bucket, indexPrefix string) []pointers.SectionPointer { + var out []pointers.SectionPointer + + directories := []string{} + err := bucket.Iter(context.Background(), fmt.Sprintf("%s/tenant-test-tenant/indexes/", indexPrefix), func(name string) error { + directories = append(directories, name) + return nil + }) + require.NoError(t, err) + + for _, directory := range directories { + err := bucket.Iter(context.Background(), directory, func(name string) error { + + objReader, err := bucket.Get(context.Background(), name) + require.NoError(t, err) + defer objReader.Close() + + objectBytes, err := io.ReadAll(objReader) + require.NoError(t, err) + + object, err := dataobj.FromReaderAt(bytes.NewReader(objectBytes), int64(len(objectBytes))) + require.NoError(t, err) + + var reader pointers.RowReader + defer reader.Close() + + buf := make([]pointers.SectionPointer, 64) + + for _, section := range object.Sections() { + if !pointers.CheckSection(section) { + continue + } + + sec, err := pointers.Open(context.Background(), section) + if err != nil { + return fmt.Errorf("opening section: %w", err) + } + + reader.Reset(sec) + for { + num, err := reader.Read(context.Background(), buf) + if err != nil && err != io.EOF { + return fmt.Errorf("reading section: %w", err) + } + if num == 0 && err == io.EOF { + break + } + out = append(out, buf[:num]...) + } + } + return nil + }) + require.NoError(t, err) + } + require.NoError(t, err) + + return out +} diff --git a/pkg/dataobj/index/indexobj/builder.go b/pkg/dataobj/index/indexobj/builder.go new file mode 100644 index 0000000000..5afbd60da7 --- /dev/null +++ b/pkg/dataobj/index/indexobj/builder.go @@ -0,0 +1,420 @@ +// Package indexobj provides tooling for creating index-oriented data objects. +package indexobj + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "sort" + "time" + + "github.com/grafana/dskit/flagext" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" +) + +// ErrBuilderFull is returned by [Builder.Append] when the buffer is +// full and needs to flush; call [Builder.Flush] to flush it. +var ( + ErrBuilderFull = errors.New("builder full") + ErrBuilderEmpty = errors.New("builder empty") +) + +// BuilderConfig configures a [Builder]. +type BuilderConfig struct { + // TargetPageSize configures a target size for encoded pages within the data + // object. TargetPageSize accounts for encoding, but not for compression. + TargetPageSize flagext.Bytes `yaml:"target_page_size"` + + // TODO(rfratto): We need an additional parameter for TargetMetadataSize, as + // metadata payloads can't be split and must be downloaded in a single + // request. + // + // At the moment, we don't have a good mechanism for implementing a metadata + // size limit (we need to support some form of section splitting or column + // combinations), so the option is omitted for now. + + // TargetObjectSize configures a target size for data objects. + TargetObjectSize flagext.Bytes `yaml:"target_object_size"` + + // TargetSectionSize configures the maximum size of data in a section. Sections + // which support this parameter will place overflow data into new sections of + // the same type. + TargetSectionSize flagext.Bytes `yaml:"target_section_size"` + + // BufferSize configures the size of the buffer used to accumulate + // uncompressed logs in memory prior to sorting. + BufferSize flagext.Bytes `yaml:"buffer_size"` + + // SectionStripeMergeLimit configures the number of stripes to merge at once when + // flushing stripes into a section. MergeSize must be larger than 1. Lower + // values of MergeSize trade off lower memory overhead for higher time spent + // merging. + SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` +} + +// RegisterFlagsWithPrefix registers flags with the given prefix. +func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + _ = cfg.TargetPageSize.Set("128KB") + _ = cfg.TargetObjectSize.Set("64MB") + _ = cfg.BufferSize.Set("2MB") + _ = cfg.TargetSectionSize.Set("16MB") + + f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.") + f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") + f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") + f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") + f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of stripes to merge into a section at once. Must be greater than 1.") +} + +// Validate validates the BuilderConfig. +func (cfg *BuilderConfig) Validate() error { + var errs []error + + if cfg.TargetPageSize <= 0 { + errs = append(errs, errors.New("TargetPageSize must be greater than 0")) + } else if cfg.TargetPageSize >= cfg.TargetObjectSize { + errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize")) + } + + if cfg.TargetObjectSize <= 0 { + errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) + } + + if cfg.BufferSize <= 0 { + errs = append(errs, errors.New("BufferSize must be greater than 0")) + } + + if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { + errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) + } + + if cfg.SectionStripeMergeLimit < 2 { + errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) + } + + return errors.Join(errs...) +} + +// A Builder constructs a logs-oriented data object from a set of incoming +// log data. Log data is appended by calling [LogBuilder.Append]. A complete +// data object is constructed by by calling [LogBuilder.Flush]. +// +// Methods on Builder are not goroutine-safe; callers are responsible for +// synchronization. +type Builder struct { + cfg BuilderConfig + metrics *builderMetrics + + labelCache *lru.Cache[string, labels.Labels] + + currentSizeEstimate int + + builder *dataobj.Builder // Inner builder for accumulating sections. + streams *streams.Builder + pointers *pointers.Builder + + state builderState +} + +type builderState int + +const ( + // builderStateEmpty indicates the builder is empty and ready to accept new data. + builderStateEmpty builderState = iota + + // builderStateDirty indicates the builder has been modified since the last flush. + builderStateDirty +) + +// NewBuilder creates a new [Builder] which stores log-oriented data objects. +// +// NewBuilder returns an error if the provided config is invalid. +func NewBuilder(cfg BuilderConfig) (*Builder, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + labelCache, err := lru.New[string, labels.Labels](5000) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + + metrics := newBuilderMetrics() + metrics.ObserveConfig(cfg) + + return &Builder{ + cfg: cfg, + metrics: metrics, + + labelCache: labelCache, + + builder: dataobj.NewBuilder(), + streams: streams.NewBuilder(metrics.streams, int(cfg.TargetPageSize)), + pointers: pointers.NewBuilder(metrics.pointers, int(cfg.TargetPageSize)), + }, nil +} + +func (b *Builder) GetEstimatedSize() int { + return b.currentSizeEstimate +} + +// AppendStream appends a stream to the object's stream section, returning the stream ID within this object. +func (b *Builder) AppendStream(stream streams.Stream) (int64, error) { + b.metrics.appendsTotal.Inc() + + newEntrySize := labelsEstimate(stream.Labels) + 2 + + if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { + return 0, ErrBuilderFull + } + + timer := prometheus.NewTimer(b.metrics.appendTime) + defer timer.ObserveDuration() + + // Record the stream in the stream section. + // Once to capture the min timestamp and uncompressed size, again to record the max timestamp. + sort.Sort(stream.Labels) + streamID := b.streams.Record(stream.Labels, stream.MinTimestamp, stream.UncompressedSize) + _ = b.streams.Record(stream.Labels, stream.MaxTimestamp, 0) + + // If our logs section has gotten big enough, we want to flush it to the + // encoder and start a new section. + if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) { + if err := b.builder.Append(b.pointers); err != nil { + b.metrics.appendFailures.Inc() + return 0, err + } + } + + b.currentSizeEstimate = b.estimatedSize() + b.state = builderStateDirty + + return streamID, nil +} + +// labelsEstimate estimates the size of a set of labels in bytes. +func labelsEstimate(ls labels.Labels) int { + var ( + keysSize int + valuesSize int + ) + + for _, l := range ls { + keysSize += len(l.Name) + valuesSize += len(l.Value) + } + + // Keys are stored as columns directly, while values get compressed. We'll + // underestimate a 2x compression ratio. + return keysSize + valuesSize/2 +} + +// RecordStreamRef records a reference to a stream from another object, as the stream IDs will be different between objects. +func (b *Builder) RecordStreamRef(path string, streamIDInObject int64, streamID int64) { + b.pointers.RecordStreamRef(path, streamIDInObject, streamID) +} + +// Append buffers a stream to be written to a data object. Append returns an +// error if the stream labels cannot be parsed or [ErrBuilderFull] if the +// builder is full. +// +// Once a Builder is full, call [Builder.Flush] to flush the buffered data, +// then call Append again with the same entry. +func (b *Builder) ObserveLogLine(path string, section int64, streamIDInObject int64, ts time.Time, uncompressedSize int64) error { + // Check whether the buffer is full before a stream can be appended; this is + // tends to overestimate, but we may still go over our target size. + // + // Since this check only happens after the first call to Append, + // b.currentSizeEstimate will always be updated to reflect the size following + // the previous append. + + newEntrySize := 4 // ints and times compress well so we just need to make an estimate. + + if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { + return ErrBuilderFull + } + + timer := prometheus.NewTimer(b.metrics.appendTime) + defer timer.ObserveDuration() + + b.pointers.ObserveStream(path, section, streamIDInObject, ts, uncompressedSize) + + // If our logs section has gotten big enough, we want to flush it to the + // encoder and start a new section. + if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) { + if err := b.builder.Append(b.pointers); err != nil { + return err + } + } + + b.currentSizeEstimate = b.estimatedSize() + b.state = builderStateDirty + return nil +} + +// Append buffers a stream to be written to a data object. Append returns an +// error if the stream labels cannot be parsed or [ErrBuilderFull] if the +// builder is full. +// +// Once a Builder is full, call [Builder.Flush] to flush the buffered data, +// then call Append again with the same entry. +func (b *Builder) AppendColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloom []byte) error { + // Check whether the buffer is full before a stream can be appended; this is + // tends to overestimate, but we may still go over our target size. + // + // Since this check only happens after the first call to Append, + // b.currentSizeEstimate will always be updated to reflect the size following + // the previous append. + + newEntrySize := len(columnName) + 1 + 1 + len(valuesBloom) + 1 + + if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) { + return ErrBuilderFull + } + + timer := prometheus.NewTimer(b.metrics.appendTime) + defer timer.ObserveDuration() + + b.pointers.RecordColumnIndex(path, section, columnName, columnIndex, valuesBloom) + + // If our logs section has gotten big enough, we want to flush it to the + // encoder and start a new section. + if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) { + if err := b.builder.Append(b.pointers); err != nil { + return err + } + } + + b.currentSizeEstimate = b.estimatedSize() + b.state = builderStateDirty + return nil +} + +func (b *Builder) estimatedSize() int { + var size int + size += b.streams.EstimatedSize() + size += b.pointers.EstimatedSize() + size += b.builder.Bytes() + b.metrics.sizeEstimate.Set(float64(size)) + return size +} + +type FlushStats struct { + MinTimestamp time.Time + MaxTimestamp time.Time +} + +// Flush flushes all buffered data to the buffer provided. Calling Flush can result +// in a no-op if there is no buffered data to flush. +// +// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended. +func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) { + if b.state == builderStateEmpty { + return FlushStats{}, ErrBuilderEmpty + } + + b.metrics.flushTotal.Inc() + timer := prometheus.NewTimer(b.metrics.buildTime) + defer timer.ObserveDuration() + + // Appending sections resets them, so we need to load the time range before + // appending. + minTime, maxTime := b.streams.TimeRange() + + // Flush sections one more time in case they have data. + var flushErrors []error + + flushErrors = append(flushErrors, b.builder.Append(b.streams)) + flushErrors = append(flushErrors, b.builder.Append(b.pointers)) + + if err := errors.Join(flushErrors...); err != nil { + b.metrics.flushFailures.Inc() + return FlushStats{}, fmt.Errorf("building object: %w", err) + } + + sz, err := b.builder.Flush(output) + if err != nil { + b.metrics.flushFailures.Inc() + return FlushStats{}, fmt.Errorf("building object: %w", err) + } + + b.metrics.builtSize.Observe(float64(sz)) + + var ( + // We don't know if output was empty before calling Flush, so we only start + // reading from where we know writing began. + + objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):]) + objLength = sz + ) + obj, err := dataobj.FromReaderAt(objReader, objLength) + if err != nil { + b.metrics.flushFailures.Inc() + return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err) + } + + err = b.observeObject(context.Background(), obj) + + b.Reset() + return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err +} + +func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error { + var errs []error + + errs = append(errs, b.metrics.dataobj.Observe(obj)) + + for _, sec := range obj.Sections() { + switch { + case pointers.CheckSection(sec): + pointerSection, err := pointers.Open(context.Background(), sec) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.metrics.pointers.Observe(ctx, pointerSection)) + case streams.CheckSection(sec): + streamSection, err := streams.Open(context.Background(), sec) + if err != nil { + errs = append(errs, err) + continue + } + errs = append(errs, b.metrics.streams.Observe(ctx, streamSection)) + } + } + + return errors.Join(errs...) +} + +// Reset discards pending data and resets the builder to an empty state. +func (b *Builder) Reset() { + b.builder.Reset() + b.streams.Reset() + b.pointers.Reset() + + //b.metrics.sizeEstimate.Set(0) + b.currentSizeEstimate = 0 + b.state = builderStateEmpty +} + +// RegisterMetrics registers metrics about builder to report to reg. All +// metrics will have a tenant label set to the tenant ID of the Builder. +// +// If multiple Builders for the same tenant are running in the same process, +// reg must contain additional labels to differentiate between them. +func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error { + return b.metrics.Register(reg) +} + +// UnregisterMetrics unregisters metrics about builder from reg. +func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) { + b.metrics.Unregister(reg) +} diff --git a/pkg/dataobj/index/indexobj/builder_metrics.go b/pkg/dataobj/index/indexobj/builder_metrics.go new file mode 100644 index 0000000000..cf9a42994d --- /dev/null +++ b/pkg/dataobj/index/indexobj/builder_metrics.go @@ -0,0 +1,165 @@ +package indexobj + +import ( + "errors" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" +) + +// builderMetrics provides instrumnetation for a [Builder]. +type builderMetrics struct { + pointers *pointers.Metrics + streams *streams.Metrics + dataobj *dataobj.Metrics + + targetPageSize prometheus.Gauge + targetObjectSize prometheus.Gauge + + appendTime prometheus.Histogram + appendFailures prometheus.Counter + appendsTotal prometheus.Counter + + buildTime prometheus.Histogram + flushFailures prometheus.Counter + flushTotal prometheus.Counter + + sizeEstimate prometheus.Gauge + builtSize prometheus.Histogram +} + +// newBuilderMetrics creates a new set of [builderMetrics] for instrumenting +// logs objects. +func newBuilderMetrics() *builderMetrics { + return &builderMetrics{ + pointers: pointers.NewMetrics(), + streams: streams.NewMetrics(), + dataobj: dataobj.NewMetrics(), + targetPageSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_indexobj_config_target_page_size_bytes", + + Help: "Configured target page size in bytes.", + }), + + targetObjectSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_indexobj_config_target_object_size_bytes", + + Help: "Configured target object size in bytes.", + }), + + appendTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_indexobj_append_time_seconds", + + Help: "Time taken appending a set of log lines in a stream to a data object.", + + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + + appendFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_indexobj_append_failures_total", + Help: "Total number of append failures", + }), + + appendsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_indexobj_appends_total", + Help: "Total number of appends", + }), + + buildTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_indexobj_build_time_seconds", + + Help: "Time taken building a data object to flush.", + + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + + sizeEstimate: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "loki_indexobj_size_estimate_bytes", + + Help: "Current estimated size of the data object in bytes.", + }), + + builtSize: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_indexobj_built_size_bytes", + + Help: "Distribution of constructed data object sizes in bytes.", + + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + + flushFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_indexobj_flush_failures_total", + + Help: "Total number of flush failures.", + }), + + flushTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_indexobj_flush_total", + + Help: "Total number of flushes.", + }), + } +} + +// ObserveConfig updates config metrics based on the provided [BuilderConfig]. +func (m *builderMetrics) ObserveConfig(cfg BuilderConfig) { + m.targetPageSize.Set(float64(cfg.TargetPageSize)) + m.targetObjectSize.Set(float64(cfg.TargetObjectSize)) +} + +// Register registers metrics to report to reg. +func (m *builderMetrics) Register(reg prometheus.Registerer) error { + var errs []error + + errs = append(errs, m.pointers.Register(reg)) + errs = append(errs, m.streams.Register(reg)) + errs = append(errs, m.dataobj.Register(reg)) + + errs = append(errs, reg.Register(m.targetPageSize)) + errs = append(errs, reg.Register(m.targetObjectSize)) + + errs = append(errs, reg.Register(m.appendTime)) + errs = append(errs, reg.Register(m.appendFailures)) + errs = append(errs, reg.Register(m.appendsTotal)) + + errs = append(errs, reg.Register(m.buildTime)) + + errs = append(errs, reg.Register(m.sizeEstimate)) + errs = append(errs, reg.Register(m.builtSize)) + errs = append(errs, reg.Register(m.flushFailures)) + errs = append(errs, reg.Register(m.flushTotal)) + + return errors.Join(errs...) +} + +// Unregister unregisters metrics from the provided Registerer. +func (m *builderMetrics) Unregister(reg prometheus.Registerer) { + m.pointers.Unregister(reg) + m.streams.Unregister(reg) + m.dataobj.Unregister(reg) + + reg.Unregister(m.targetPageSize) + reg.Unregister(m.targetObjectSize) + + reg.Unregister(m.appendTime) + reg.Unregister(m.appendFailures) + reg.Unregister(m.appendsTotal) + + reg.Unregister(m.buildTime) + + reg.Unregister(m.sizeEstimate) + reg.Unregister(m.builtSize) + reg.Unregister(m.flushFailures) + reg.Unregister(m.flushTotal) +} diff --git a/pkg/dataobj/index/indexobj/builder_test.go b/pkg/dataobj/index/indexobj/builder_test.go new file mode 100644 index 0000000000..838a4d18cf --- /dev/null +++ b/pkg/dataobj/index/indexobj/builder_test.go @@ -0,0 +1,151 @@ +package indexobj + +import ( + "bytes" + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" + "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" +) + +var testBuilderConfig = BuilderConfig{ + TargetPageSize: 2048, + TargetObjectSize: 1 << 22, // 4 MiB + TargetSectionSize: 1 << 21, // 2 MiB + + BufferSize: 2048 * 8, + + SectionStripeMergeLimit: 2, +} + +func TestBuilder(t *testing.T) { + buf := bytes.NewBuffer(nil) + dirtyBuf := bytes.NewBuffer([]byte("dirty")) + + testStreams := []streams.Stream{ + { + ID: 1, + Labels: labels.Labels{ + {Name: "cluster", Value: "test"}, + {Name: "app", Value: "foo"}, + }, + Rows: 2, + MinTimestamp: time.Unix(10, 0).UTC(), + MaxTimestamp: time.Unix(20, 0).UTC(), + UncompressedSize: 200, + }, + { + ID: 2, + Labels: labels.Labels{ + {Name: "cluster", Value: "test"}, + {Name: "app", Value: "bar"}, + }, + Rows: 3, + MinTimestamp: time.Unix(15, 0).UTC(), + MaxTimestamp: time.Unix(25, 0).UTC(), + UncompressedSize: 100, + }, + } + + testPointers := []pointers.SectionPointer{ + { + Path: "test/path", + Section: 1, + ColumnName: "foo", + ColumnIndex: 1, + ValuesBloomFilter: []byte{1, 2, 3}, + }, + } + + t.Run("Build", func(t *testing.T) { + builder, err := NewBuilder(testBuilderConfig) + require.NoError(t, err) + + for _, stream := range testStreams { + _, err := builder.AppendStream(stream) + require.NoError(t, err) + } + for _, pointer := range testPointers { + err := builder.AppendColumnIndex(pointer.Path, pointer.Section, pointer.ColumnName, pointer.ColumnIndex, pointer.ValuesBloomFilter) + require.NoError(t, err) + } + _, err = builder.Flush(buf) + require.NoError(t, err) + }) + + t.Run("Read", func(t *testing.T) { + obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) + require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection)) + require.Equal(t, 0, obj.Sections().Count(logs.CheckSection)) + }) + + t.Run("BuildWithDirtyBuffer", func(t *testing.T) { + builder, err := NewBuilder(testBuilderConfig) + require.NoError(t, err) + + for _, stream := range testStreams { + _, err := builder.AppendStream(stream) + require.NoError(t, err) + } + for _, pointer := range testPointers { + err := builder.AppendColumnIndex(pointer.Path, pointer.Section, pointer.ColumnName, pointer.ColumnIndex, pointer.ValuesBloomFilter) + require.NoError(t, err) + } + + _, err = builder.Flush(dirtyBuf) + require.NoError(t, err) + + require.Equal(t, buf.Len(), dirtyBuf.Len()-5) + }) + + t.Run("ReadFromDirtyBuffer", func(t *testing.T) { + obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5)) + require.NoError(t, err) + require.Equal(t, 1, obj.Sections().Count(streams.CheckSection)) + require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection)) + require.Equal(t, 0, obj.Sections().Count(logs.CheckSection)) + }) +} + +// TestBuilder_Append ensures that appending to the buffer eventually reports +// that the buffer is full. +func TestBuilder_Append(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + builder, err := NewBuilder(testBuilderConfig) + require.NoError(t, err) + + i := 0 + for { + require.NoError(t, ctx.Err()) + + _, err := builder.AppendStream(streams.Stream{ + ID: 1, + Labels: labels.Labels{ + {Name: "cluster", Value: "test"}, + {Name: "app", Value: "foo"}, + {Name: "i", Value: fmt.Sprintf("%d", i)}, + }, + Rows: 2, + MinTimestamp: time.Unix(10, 0).UTC(), + MaxTimestamp: time.Unix(20, 0).UTC(), + }) + if errors.Is(err, ErrBuilderFull) { + break + } + require.NoError(t, err) + i++ + } +} diff --git a/pkg/dataobj/index/metrics.go b/pkg/dataobj/index/metrics.go new file mode 100644 index 0000000000..6ea7680711 --- /dev/null +++ b/pkg/dataobj/index/metrics.go @@ -0,0 +1,85 @@ +package index + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type indexBuilderMetrics struct { + // Error counters + commitFailures prometheus.Counter + + // Request counters + commitsTotal prometheus.Counter + + // Processing delay histogram + processingDelay prometheus.Histogram +} + +func newIndexBuilderMetrics() *indexBuilderMetrics { + p := &indexBuilderMetrics{ + commitFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_index_builder_commit_failures_total", + Help: "Total number of commit failures", + }), + commitsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_index_builder_commits_total", + Help: "Total number of commits", + }), + processingDelay: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_index_builder_processing_delay_seconds", + Help: "Time difference between record timestamp and processing time in seconds", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + } + + return p +} + +func (p *indexBuilderMetrics) register(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + p.commitFailures, + p.commitsTotal, + p.processingDelay, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + return nil +} + +func (p *indexBuilderMetrics) unregister(reg prometheus.Registerer) { + collectors := []prometheus.Collector{ + p.commitFailures, + p.commitsTotal, + p.processingDelay, + } + + for _, collector := range collectors { + reg.Unregister(collector) + } +} + +func (p *indexBuilderMetrics) incCommitFailures() { + p.commitFailures.Inc() +} + +func (p *indexBuilderMetrics) incCommitsTotal() { + p.commitsTotal.Inc() +} + +func (p *indexBuilderMetrics) observeProcessingDelay(recordTimestamp time.Time) { + // Convert milliseconds to seconds and calculate delay + if !recordTimestamp.IsZero() { // Only observe if timestamp is valid + p.processingDelay.Observe(time.Since(recordTimestamp).Seconds()) + } +} diff --git a/pkg/dataobj/internal/dataset/reader.go b/pkg/dataobj/internal/dataset/reader.go index abe5ed3aca..7e8f8a6caf 100644 --- a/pkg/dataobj/internal/dataset/reader.go +++ b/pkg/dataobj/internal/dataset/reader.go @@ -69,6 +69,13 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { if len(s) == 0 { return 0, nil } + // Init stats object and use the context, otherwise we create a new one every time we increment a stat. + var statistics *stats.Context + if stats.IsPresent(ctx) { + statistics = stats.FromContext(ctx) + } else { + statistics, ctx = stats.NewContext(ctx) + } if !r.ready { err := r.init(ctx) @@ -127,9 +134,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { r.dl.SetReadRange(readRange) var ( - rowsRead int // tracks max rows accessed to move the [r.row] cursor - passCount int // tracks how many rows passed the predicate - statistics = stats.FromContext(ctx) + rowsRead int // tracks max rows accessed to move the [r.row] cursor + passCount int // tracks how many rows passed the predicate ) // If there are no predicates, read all columns in the dataset diff --git a/pkg/dataobj/sections/logs/logs.go b/pkg/dataobj/sections/logs/logs.go index f140e580f0..4e454884f2 100644 --- a/pkg/dataobj/sections/logs/logs.go +++ b/pkg/dataobj/sections/logs/logs.go @@ -118,6 +118,10 @@ func convertColumnType(protoType logsmd.ColumnType) (ColumnType, bool) { return ColumnTypeInvalid, false } +func IsMetadataColumn(colType string) bool { + return colType == logsmd.COLUMN_TYPE_METADATA.String() +} + var columnTypeNames = map[ColumnType]string{ ColumnTypeInvalid: "invalid", ColumnTypeStreamID: "stream_id", diff --git a/pkg/dataobj/sections/logs/stats.go b/pkg/dataobj/sections/logs/stats.go index 136b028314..f23bbd1245 100644 --- a/pkg/dataobj/sections/logs/stats.go +++ b/pkg/dataobj/sections/logs/stats.go @@ -29,6 +29,7 @@ type ( MetadataSize uint64 ValuesCount uint64 Cardinality uint64 + ColumnIndex int64 Pages []PageStats } @@ -79,6 +80,7 @@ func ReadStats(ctx context.Context, section *Section) (Stats, error) { MetadataSize: col.Info.MetadataSize, ValuesCount: col.Info.ValuesCount, Cardinality: col.Info.Statistics.GetCardinalityCount(), + ColumnIndex: int64(i), } for _, pages := range pageSets[i] { diff --git a/pkg/dataobj/sections/pointers/builder.go b/pkg/dataobj/sections/pointers/builder.go index 4cdeacc173..3158f0c47b 100644 --- a/pkg/dataobj/sections/pointers/builder.go +++ b/pkg/dataobj/sections/pointers/builder.go @@ -52,17 +52,25 @@ const ( PointerKindColumnIndex // PointerKindColumnIndex is a pointer for a column index. ) +type streamKey struct { + objectPath string + section int64 + streamID int64 +} + // Builder builds a pointers section. type Builder struct { metrics *Metrics pageSize int // streamLookup is a map of the stream ID in this index object to the pointer. - streamLookup map[string]*SectionPointer + streamLookup map[streamKey]*SectionPointer // streamObjectRefs is a map of the stream ID in the referenced logs object to the stream ID in this index object. streamObjectRefs map[string]map[int64]int64 // pointers is the list of pointers to encode. pointers []*SectionPointer + + key streamKey } // NewBuilder creates a new pointers section builder. The pageSize argument @@ -75,7 +83,7 @@ func NewBuilder(metrics *Metrics, pageSize int) *Builder { metrics: metrics, pageSize: pageSize, - streamLookup: make(map[string]*SectionPointer), + streamLookup: make(map[streamKey]*SectionPointer), streamObjectRefs: make(map[string]map[int64]int64), pointers: make([]*SectionPointer, 0, 1024), } @@ -97,8 +105,12 @@ func (b *Builder) RecordStreamRef(path string, idInObject int64, idInIndex int64 // ObserveStream observes a stream in the index by recording the start & end timestamps, line count, and uncompressed size per-section. func (b *Builder) ObserveStream(path string, section int64, idInObject int64, ts time.Time, uncompressedSize int64) { indexStreamID := b.streamObjectRefs[path][idInObject] - key := fmt.Sprintf("%s:%d:%d", path, section, indexStreamID) - pointer, ok := b.streamLookup[key] + + b.key.objectPath = path + b.key.section = section + b.key.streamID = indexStreamID + + pointer, ok := b.streamLookup[b.key] if ok { // Update the existing pointer if ts.Before(pointer.StartTs) { @@ -124,7 +136,7 @@ func (b *Builder) ObserveStream(path string, section int64, idInObject int64, ts UncompressedSize: uncompressedSize, } b.pointers = append(b.pointers, newPointer) - b.streamLookup[key] = newPointer + b.streamLookup[b.key] = newPointer } func (b *Builder) RecordColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloomFilter []byte) { diff --git a/pkg/dataobj/sections/pointers/decoder.go b/pkg/dataobj/sections/pointers/decoder.go index 6b546702d1..6fbab2af4f 100644 --- a/pkg/dataobj/sections/pointers/decoder.go +++ b/pkg/dataobj/sections/pointers/decoder.go @@ -19,7 +19,7 @@ func newDecoder(reader dataobj.SectionReader) *decoder { return &decoder{sr: reader} } -// decoder supports decoding the raw underlying data for a streams section. +// decoder supports decoding the raw underlying data for a pointers section. type decoder struct { sr dataobj.SectionReader } @@ -28,7 +28,7 @@ type decoder struct { func (rd *decoder) Columns(ctx context.Context) ([]*pointersmd.ColumnDesc, error) { rc, err := rd.sr.Metadata(ctx) if err != nil { - return nil, fmt.Errorf("reading streams section metadata: %w", err) + return nil, fmt.Errorf("reading pointers section metadata: %w", err) } defer rc.Close() diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 7eeb7fd03e..4956609858 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -78,6 +78,11 @@ func NewContext(ctx context.Context) (*Context, context.Context) { return contextData, ctx } +func IsPresent(ctx context.Context) bool { + _, ok := ctx.Value(statsKey).(*Context) + return ok +} + // FromContext returns the statistics context. func FromContext(ctx context.Context) *Context { v, ok := ctx.Value(statsKey).(*Context) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 98e2e4922a..95d2ad5f68 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -39,6 +39,7 @@ import ( "github.com/grafana/loki/v3/pkg/compactor/deletion" dataobjconfig "github.com/grafana/loki/v3/pkg/dataobj/config" "github.com/grafana/loki/v3/pkg/dataobj/consumer" + dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" @@ -427,6 +428,7 @@ type Loki struct { blockBuilder *blockbuilder.BlockBuilder blockScheduler *blockscheduler.BlockScheduler dataObjConsumer *consumer.Service + dataObjIndexBuilder *dataobjindex.Builder ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -768,6 +770,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(DataObjExplorer, t.initDataObjExplorer) mm.RegisterModule(UI, t.initUI) mm.RegisterModule(DataObjConsumer, t.initDataObjConsumer) + mm.RegisterModule(DataObjIndexBuilder, t.initDataObjIndexBuilder) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -814,6 +817,7 @@ func (t *Loki) setupModuleManager() error { BlockScheduler: {Server, UI}, DataObjExplorer: {Server, UI}, DataObjConsumer: {PartitionRing, Server, UI}, + DataObjIndexBuilder: {Server, UI}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 19bdf91a9e..66fd84ead3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -54,6 +54,7 @@ import ( "github.com/grafana/loki/v3/pkg/compactor/generationnumber" "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/explorer" + dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" "github.com/grafana/loki/v3/pkg/dataobj/metastore" dataobjquerier "github.com/grafana/loki/v3/pkg/dataobj/querier" "github.com/grafana/loki/v3/pkg/distributor" @@ -158,6 +159,7 @@ const ( BlockScheduler = "block-scheduler" DataObjExplorer = "dataobj-explorer" DataObjConsumer = "dataobj-consumer" + DataObjIndexBuilder = "dataobj-index-builder" UI = "ui" All = "all" Read = "read" @@ -2164,6 +2166,28 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) { return t.dataObjConsumer, nil } +func (t *Loki) initDataObjIndexBuilder() (services.Service, error) { + if !t.Cfg.Ingester.KafkaIngestion.Enabled { + return nil, nil + } + store, err := t.createDataObjBucket("dataobj-index-builder") + if err != nil { + return nil, err + } + + level.Info(util_log.Logger).Log("msg", "initializing dataobj index builder", "instance", t.Cfg.Ingester.LifecyclerConfig.ID) + t.dataObjIndexBuilder, err = dataobjindex.NewIndexBuilder( + t.Cfg.DataObj.Index, + t.Cfg.KafkaConfig, + util_log.Logger, + t.Cfg.Ingester.LifecyclerConfig.ID, + store, + prometheus.DefaultRegisterer, + ) + + return t.dataObjIndexBuilder, err +} + func (t *Loki) createDataObjBucket(clientName string) (objstore.Bucket, error) { schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) if err != nil {