feat: Initial index-builder implementation (#18297)

pull/18378/head
benclive 11 months ago committed by GitHub
parent d5168ebd7b
commit 1f0edcd4ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 36
      docs/sources/shared/configuration.md
  2. 3
      pkg/dataobj/config/config.go
  3. 4
      pkg/dataobj/consumer/service.go
  4. 511
      pkg/dataobj/index/builder.go
  5. 184
      pkg/dataobj/index/builder_test.go
  6. 420
      pkg/dataobj/index/indexobj/builder.go
  7. 165
      pkg/dataobj/index/indexobj/builder_metrics.go
  8. 151
      pkg/dataobj/index/indexobj/builder_test.go
  9. 85
      pkg/dataobj/index/metrics.go
  10. 12
      pkg/dataobj/internal/dataset/reader.go
  11. 4
      pkg/dataobj/sections/logs/logs.go
  12. 2
      pkg/dataobj/sections/logs/stats.go
  13. 22
      pkg/dataobj/sections/pointers/builder.go
  14. 4
      pkg/dataobj/sections/pointers/decoder.go
  15. 5
      pkg/logqlmodel/stats/context.go
  16. 4
      pkg/loki/loki.go
  17. 24
      pkg/loki/modules.go

@ -1056,6 +1056,42 @@ dataobj:
# CLI flag: -dataobj-consumer.idle-flush-timeout
[idle_flush_timeout: <duration> | default = 1h]
index:
# The size of the target page to use for the data object builder.
# CLI flag: -dataobj-index-builder.target-page-size
[target_page_size: <int> | default = 128KiB]
# The size of the target object to use for the data object builder.
# CLI flag: -dataobj-index-builder.target-object-size
[target_object_size: <int> | default = 64MiB]
# Configures a maximum size for sections, for sections that support it.
# CLI flag: -dataobj-index-builder.target-section-size
[target_section_size: <int> | default = 16MiB]
# The size of the buffer to use for sorting logs.
# CLI flag: -dataobj-index-builder.buffer-size
[buffer_size: <int> | default = 2MiB]
# The maximum number of stripes to merge into a section at once. Must be
# greater than 1.
# CLI flag: -dataobj-index-builder.section-stripe-merge-limit
[section_stripe_merge_limit: <int> | default = 2]
# Experimental: The number of events to batch before building an index
# CLI flag: -dataobj-index-builder.events-per-index
[events_per_index: <int> | default = 32]
# Experimental: A prefix to use for storing indexes in object storage. Used
# to separate the metastore & index files during initial testing.
# CLI flag: -dataobj-index-builder.storage-prefix
[index_storage_prefix: <string> | default = "index/v0/"]
# Experimental: A list of tenant IDs to enable index building for. If empty,
# all tenants will be enabled.
# CLI flag: -dataobj-index-builder.enabled-tenant-ids
[enabled_tenant_ids: <string> | default = ""]
querier:
# Enable the dataobj querier.
# CLI flag: -dataobj-querier-enabled

@ -4,11 +4,13 @@ import (
"flag"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/index"
"github.com/grafana/loki/v3/pkg/dataobj/querier"
)
type Config struct {
Consumer consumer.Config `yaml:"consumer"`
Index index.Config `yaml:"index"`
Querier querier.Config `yaml:"querier"`
// StorageBucketPrefix is the prefix to use for the storage bucket.
StorageBucketPrefix string `yaml:"storage_bucket_prefix"`
@ -16,6 +18,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Consumer.RegisterFlags(f)
cfg.Index.RegisterFlags(f)
cfg.Querier.RegisterFlags(f)
f.StringVar(&cfg.StorageBucketPrefix, "dataobj-storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.")
}

@ -34,6 +34,7 @@ type Service struct {
client *consumer.Client
eventsProducerClient *kgo.Client
eventConsumerClient *kgo.Client
cfg Config
bucket objstore.Bucket
@ -82,14 +83,15 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.
eventsKafkaCfg := kafkaCfg
eventsKafkaCfg.Topic = "loki.metastore-events"
eventsKafkaCfg.AutoCreateTopicDefaultPartitions = 1
eventsProducerClient, err := client.NewWriterClient("loki.metastore-events", eventsKafkaCfg, 50, logger, reg)
if err != nil {
level.Error(logger).Log("msg", "failed to create producer", "err", err)
return nil
}
s.client = consumerClient
s.eventsProducerClient = eventsProducerClient
s.Service = services.NewBasicService(nil, s.run, s.stopping)
return s
}

@ -0,0 +1,511 @@
package index
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"fmt"
"io"
"runtime"
"slices"
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
)
type Config struct {
indexobj.BuilderConfig `yaml:",inline"`
EventsPerIndex int `yaml:"events_per_index" experimental:"true"`
IndexStoragePrefix string `yaml:"index_storage_prefix" experimental:"true"`
EnabledTenantIDs flagext.StringSliceCSV `yaml:"enabled_tenant_ids" experimental:"true"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f)
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index")
f.StringVar(&cfg.IndexStoragePrefix, prefix+"storage-prefix", "index/v0/", "Experimental: A prefix to use for storing indexes in object storage. Used to separate the metastore & index files during initial testing.")
f.Var(&cfg.EnabledTenantIDs, prefix+"enabled-tenant-ids", "Experimental: A list of tenant IDs to enable index building for. If empty, all tenants will be enabled.")
}
type downloadedObject struct {
event metastore.ObjectWrittenEvent
objectBytes *[]byte
err error
}
const (
indexEventTopic = "loki.metastore-events"
indexConsumerGroup = "metastore-event-reader"
)
type Builder struct {
services.Service
cfg Config
// Kafka client and topic/partition info
client *kgo.Client
topic string
// Processing pipeline
downloadQueue chan metastore.ObjectWrittenEvent
downloadedObjects chan downloadedObject
builder *indexobj.Builder
bufferedEvents map[string][]metastore.ObjectWrittenEvent
// Builder initialization
builderCfg indexobj.BuilderConfig
bucket objstore.Bucket
flushBuffer *bytes.Buffer
// Metrics
metrics *indexBuilderMetrics
// Control and coordination
ctx context.Context
cancel context.CancelCauseFunc
wg sync.WaitGroup
logger log.Logger
builderMtx sync.Mutex
}
func NewIndexBuilder(
cfg Config,
kafkaCfg kafka.Config,
logger log.Logger,
instanceID string,
bucket objstore.Bucket,
reg prometheus.Registerer,
) (*Builder, error) {
kafkaCfg.AutoCreateTopicEnabled = true
kafkaCfg.AutoCreateTopicDefaultPartitions = 64
eventConsumerClient, err := client.NewReaderClient(
"index_builder",
kafkaCfg,
logger,
reg,
kgo.ConsumeTopics(indexEventTopic),
kgo.InstanceID(instanceID),
kgo.SessionTimeout(3*time.Minute),
kgo.ConsumerGroup(indexConsumerGroup),
kgo.RebalanceTimeout(5*time.Minute),
kgo.DisableAutoCommit(),
)
if err != nil {
return nil, fmt.Errorf("failed to create kafka consumer client: %w", err)
}
reg = prometheus.WrapRegistererWith(prometheus.Labels{
"topic": indexEventTopic,
"component": "index_builder",
}, reg)
metrics := newIndexBuilderMetrics()
if err := metrics.register(reg); err != nil {
return nil, fmt.Errorf("failed to register metrics for index builder: %w", err)
}
builder, err := indexobj.NewBuilder(cfg.BuilderConfig)
if err != nil {
return nil, fmt.Errorf("failed to create index builder: %w", err)
}
if err := builder.RegisterMetrics(reg); err != nil {
return nil, fmt.Errorf("failed to register metrics for index builder: %w", err)
}
// Allocate a single buffer
flushBuffer := bytes.NewBuffer(make([]byte, int(float64(cfg.BuilderConfig.TargetObjectSize)*1.2)))
// Set up queues to download the next object (I/O bound) while processing the current one (CPU bound) in order to maximize throughput.
// Setting the channel buffer sizes caps the total memory usage by only keeping up to 3 objects in memory at a time: One being processed, one fully downloaded and one being downloaded from the queue.
downloadQueue := make(chan metastore.ObjectWrittenEvent, cfg.EventsPerIndex)
downloadedObjects := make(chan downloadedObject, 1)
s := &Builder{
cfg: cfg,
client: eventConsumerClient,
logger: logger,
builder: builder,
bucket: bucket,
flushBuffer: flushBuffer,
builderMtx: sync.Mutex{},
downloadedObjects: downloadedObjects,
downloadQueue: downloadQueue,
metrics: metrics,
bufferedEvents: make(map[string][]metastore.ObjectWrittenEvent),
}
s.Service = services.NewBasicService(nil, s.run, s.stopping)
return s, nil
}
func (p *Builder) run(ctx context.Context) error {
p.ctx, p.cancel = context.WithCancelCause(ctx)
p.wg.Add(1)
go func() {
// Download worker
defer p.wg.Done()
for event := range p.downloadQueue {
objLogger := log.With(p.logger, "object_path", event.ObjectPath)
downloadStart := time.Now()
objectReader, err := p.bucket.Get(p.ctx, event.ObjectPath)
if err != nil {
p.downloadedObjects <- downloadedObject{
event: event,
err: fmt.Errorf("failed to fetch object from storage: %w", err),
}
continue
}
object, err := io.ReadAll(objectReader)
_ = objectReader.Close()
if err != nil {
p.downloadedObjects <- downloadedObject{
event: event,
err: fmt.Errorf("failed to read object: %w", err),
}
continue
}
level.Info(objLogger).Log("msg", "downloaded object", "duration", time.Since(downloadStart), "size_mb", float64(len(object))/1024/1024, "avg_speed_mbps", float64(len(object))/time.Since(downloadStart).Seconds()/1024/1024)
p.downloadedObjects <- downloadedObject{
event: event,
objectBytes: &object,
}
}
}()
level.Info(p.logger).Log("msg", "started index builder service")
for {
fetches := p.client.PollRecords(ctx, -1)
if fetches.IsClientClosed() || ctx.Err() != nil {
return ctx.Err()
}
if errs := fetches.Errors(); len(errs) > 0 {
level.Error(p.logger).Log("msg", "error fetching records", "err", errs)
continue
}
if fetches.Empty() {
continue
}
fetches.EachPartition(func(ftp kgo.FetchTopicPartition) {
// TODO(benclive): Verify if we need to return re-poll ASAP or if sequential processing is good enough.
for _, record := range ftp.Records {
p.processRecord(record)
}
})
}
}
func (p *Builder) stopping(failureCase error) error {
close(p.downloadQueue)
p.cancel(failureCase)
p.wg.Wait()
close(p.downloadedObjects)
p.client.Close()
return nil
}
func (p *Builder) processRecord(record *kgo.Record) {
event := &metastore.ObjectWrittenEvent{}
if err := event.Unmarshal(record.Value); err != nil {
level.Error(p.logger).Log("msg", "failed to unmarshal metastore event", "err", err)
return
}
p.bufferedEvents[event.Tenant] = append(p.bufferedEvents[event.Tenant], *event)
level.Info(p.logger).Log("msg", "buffered new event for tenant", "count", len(p.bufferedEvents[event.Tenant]), "tenant", event.Tenant)
if len(p.bufferedEvents[event.Tenant]) >= p.cfg.EventsPerIndex {
if !slices.Contains(p.cfg.EnabledTenantIDs, event.Tenant) {
// TODO(benclive): Remove this check once builders handle multi-tenancy when building indexes.
level.Info(p.logger).Log("msg", "skipping index build for disabled tenant", "tenant", event.Tenant)
p.bufferedEvents[event.Tenant] = p.bufferedEvents[event.Tenant][:0]
return
}
err := p.buildIndex(p.bufferedEvents[event.Tenant][:len(p.bufferedEvents[event.Tenant])])
if err != nil {
// TODO(benclive): Improve error handling for failed index builds.
panic(err)
}
if err := p.commitRecords(record); err != nil {
level.Warn(p.logger).Log("msg", "failed to commit records", "err", err)
return
}
p.bufferedEvents[event.Tenant] = p.bufferedEvents[event.Tenant][:0]
}
}
func (p *Builder) buildIndex(events []metastore.ObjectWrittenEvent) error {
indexStorageBucket := objstore.NewPrefixedBucket(p.bucket, p.cfg.IndexStoragePrefix)
level.Info(p.logger).Log("msg", "building index", "events", len(events), "tenant", events[0].Tenant)
start := time.Now()
// Observe processing delay
writeTime, err := time.Parse(time.RFC3339, events[0].WriteTime)
if err != nil {
level.Error(p.logger).Log("msg", "failed to parse write time", "err", err)
return err
}
p.metrics.observeProcessingDelay(writeTime)
// Trigger the downloads
for _, event := range events {
p.downloadQueue <- event
}
// Process the results as they are downloaded
processingErrors := multierror.New()
for i := 0; i < len(events); i++ {
obj := <-p.downloadedObjects
objLogger := log.With(p.logger, "object_path", obj.event.ObjectPath)
level.Info(objLogger).Log("msg", "processing object")
if obj.err != nil {
processingErrors.Add(fmt.Errorf("failed to download object: %w", obj.err))
continue
}
reader, err := dataobj.FromReaderAt(bytes.NewReader(*obj.objectBytes), int64(len(*obj.objectBytes)))
if err != nil {
processingErrors.Add(fmt.Errorf("failed to read object: %w", err))
continue
}
// Streams Section: process this section first to ensure all streams have been added to the builder and are given new IDs.
for i, section := range reader.Sections().Filter(streams.CheckSection) {
level.Debug(objLogger).Log("msg", "processing streams section", "index", i)
if err := p.processStreamsSection(section, obj.event.ObjectPath); err != nil {
processingErrors.Add(fmt.Errorf("failed to process stream section: %w", err))
continue
}
}
// Logs Section: these can be processed in parallel once we have the stream IDs. This work is heavily CPU bound so is limited to GOMAXPROCS parallelism.
g, ctx := errgroup.WithContext(p.ctx)
g.SetLimit(runtime.GOMAXPROCS(0))
for i, section := range reader.Sections().Filter(logs.CheckSection) {
g.Go(func() error {
sectionLogger := log.With(objLogger, "section", i)
level.Debug(sectionLogger).Log("msg", "processing logs section")
// 1. A bloom filter for each column in the logs section.
// 2. A per-section stream time-range index using min/max of each stream in the logs section. StreamIDs will reference the aggregate stream section.
if err := p.processLogsSection(ctx, sectionLogger, obj.event.ObjectPath, section, int64(i)); err != nil {
return fmt.Errorf("failed to process logs section path=%s section=%d: %w", obj.event.ObjectPath, i, err)
}
return nil
})
}
if err := g.Wait(); err != nil {
processingErrors.Add(fmt.Errorf("failed to process logs sections: %w", err))
continue
}
}
if processingErrors.Err() != nil {
return processingErrors.Err()
}
p.flushBuffer.Reset()
stats, err := p.builder.Flush(p.flushBuffer)
if err != nil {
return fmt.Errorf("failed to flush builder: %w", err)
}
size := p.flushBuffer.Len()
key := p.getKey(events[0].Tenant, p.flushBuffer)
if err := indexStorageBucket.Upload(p.ctx, key, p.flushBuffer); err != nil {
return fmt.Errorf("failed to upload index: %w", err)
}
metastoreUpdater := metastore.NewUpdater(indexStorageBucket, events[0].Tenant, p.logger)
if stats.MinTimestamp.IsZero() || stats.MaxTimestamp.IsZero() {
return errors.New("failed to get min/max timestamps")
}
if err := metastoreUpdater.Update(p.ctx, key, stats.MinTimestamp, stats.MaxTimestamp); err != nil {
return fmt.Errorf("failed to update metastore: %w", err)
}
level.Info(p.logger).Log("msg", "finished building index", "tenant", events[0].Tenant, "events", len(events), "size", size, "duration", time.Since(start))
return nil
}
// getKey determines the key in object storage to upload the object to, based on our path scheme.
func (p *Builder) getKey(tenantID string, object *bytes.Buffer) string {
sum := sha256.Sum224(object.Bytes())
sumStr := hex.EncodeToString(sum[:])
return fmt.Sprintf("tenant-%s/indexes/%s/%s", tenantID, sumStr[:2], sumStr[2:])
}
func (p *Builder) processStreamsSection(section *dataobj.Section, objectPath string) /*map[int64]streams.Stream, map[uint64]int64,*/ error {
streamSection, err := streams.Open(p.ctx, section)
if err != nil {
return fmt.Errorf("failed to open stream section: %w", err)
}
streamBuf := make([]streams.Stream, 2048)
rowReader := streams.NewRowReader(streamSection)
for {
n, err := rowReader.Read(p.ctx, streamBuf)
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read stream section: %w", err)
}
if n == 0 && err == io.EOF {
break
}
for _, stream := range streamBuf[:n] {
newStreamID, err := p.builder.AppendStream(stream)
if err != nil {
return fmt.Errorf("failed to append to stream: %w", err)
}
p.builder.RecordStreamRef(objectPath, stream.ID, newStreamID)
}
}
return nil
}
// processLogsSection reads information from the logs section in order to build index information in a new object.
func (p *Builder) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64) error {
logsBuf := make([]logs.Record, 1024)
type logInfo struct {
objectPath string
sectionIdx int64
streamID int64
timestamp time.Time
length int64
}
logsInfo := make([]logInfo, len(logsBuf))
logsSection, err := logs.Open(ctx, section)
if err != nil {
return fmt.Errorf("failed to open logs section: %w", err)
}
// Fetch the column statistics in order to init the bloom filters for each column
stats, err := logs.ReadStats(ctx, logsSection)
if err != nil {
return fmt.Errorf("failed to read log section stats: %w", err)
}
columnBloomBuilders := make(map[string]*bloom.BloomFilter)
columnIndexes := make(map[string]int64)
for _, column := range stats.Columns {
if !logs.IsMetadataColumn(column.Type) {
continue
}
columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
columnIndexes[column.Name] = column.ColumnIndex
}
// Read the whole logs section to extract all the column values.
cnt := 0
// TODO(benclive): Switch to a columnar reader instead of row based
// This is also likely to be more performant, especially if we don't need to read the whole log line.
// Note: the source object would need a new column storing just the length to avoid reading the log line itself.
rowReader := logs.NewRowReader(logsSection)
for {
n, err := rowReader.Read(p.ctx, logsBuf)
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read logs section: %w", err)
}
if n == 0 && err == io.EOF {
break
}
for i, log := range logsBuf[:n] {
cnt++
for _, md := range log.Metadata {
columnBloomBuilders[md.Name].Add([]byte(md.Value))
}
logsInfo[i].objectPath = objectPath
logsInfo[i].sectionIdx = sectionIdx
logsInfo[i].streamID = log.StreamID
logsInfo[i].timestamp = log.Timestamp
logsInfo[i].length = int64(len(log.Line))
}
// Lock the mutex once per read for perf reasons.
p.builderMtx.Lock()
for _, log := range logsInfo[:n] {
err = p.builder.ObserveLogLine(log.objectPath, log.sectionIdx, log.streamID, log.timestamp, log.length)
if err != nil {
p.builderMtx.Unlock()
return fmt.Errorf("failed to observe log line: %w", err)
}
}
p.builderMtx.Unlock()
}
// Write the indexes (bloom filters) to the new index object.
for columnName, bloom := range columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
p.builderMtx.Lock()
err = p.builder.AppendColumnIndex(objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes)
p.builderMtx.Unlock()
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
}
}
level.Info(sectionLogger).Log("msg", "finished processing logs section", "rowsProcessed", cnt)
return nil
}
func (p *Builder) commitRecords(record *kgo.Record) error {
backoff := backoff.New(p.ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 20,
})
var lastErr error
backoff.Reset()
for backoff.Ongoing() {
p.metrics.incCommitsTotal()
err := p.client.CommitRecords(p.ctx, record)
if err == nil {
return nil
}
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
p.metrics.incCommitFailures()
lastErr = err
backoff.Wait()
}
return lastErr
}

@ -0,0 +1,184 @@
package index
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
"github.com/grafana/loki/v3/pkg/logproto"
)
var testBuilderConfig = indexobj.BuilderConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,
BufferSize: 4 * 1024 * 1024,
SectionStripeMergeLimit: 2,
}
func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) {
candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,
BufferSize: 4 * 1024 * 1024,
SectionStripeMergeLimit: 2,
})
require.NoError(t, err)
for i := 0; i < 10; i++ {
stream := logproto.Stream{
Labels: fmt.Sprintf("{app=\"%s\",stream=\"%d\"}", app, i),
Entries: []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}},
}
err = candidate.Append(stream)
require.NoError(t, err)
}
buf := bytes.NewBuffer(nil)
_, err = candidate.Flush(buf)
require.NoError(t, err)
err = bucket.Upload(context.Background(), path, buf)
require.NoError(t, err)
}
func TestIndexBuilder(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Setup test dependencies
bucket := objstore.NewInMemBucket()
cluster, configString := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, "loki.metastore-events")
defer cluster.Close()
client, err := kgo.NewClient(kgo.ConsumerGroup("test-consumer-group"), kgo.ConsumeTopics("loki.metastore-events"), kgo.SeedBrokers(configString))
require.NoError(t, err)
indexPrefix := "test-prefix"
tenant := "test-tenant"
p, err := NewIndexBuilder(
Config{
BuilderConfig: indexobj.BuilderConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,
BufferSize: 4 * 1024 * 1024,
SectionStripeMergeLimit: 2,
},
EventsPerIndex: 3,
IndexStoragePrefix: indexPrefix,
EnabledTenantIDs: []string{tenant},
},
kafka.Config{},
log.NewNopLogger(),
"instance-id",
bucket,
prometheus.NewRegistry(),
)
require.NoError(t, err)
p.client = client
require.NoError(t, p.StartAsync(ctx))
buildLogObject(t, "loki", "test-path-0", bucket)
buildLogObject(t, "testing", "test-path-1", bucket)
buildLogObject(t, "three", "test-path-2", bucket)
for i := 0; i < 3; i++ {
event := metastore.ObjectWrittenEvent{
ObjectPath: fmt.Sprintf("test-path-%d", i),
Tenant: tenant,
WriteTime: time.Now().Format(time.RFC3339),
}
eventBytes, err := event.Marshal()
require.NoError(t, err)
p.processRecord(&kgo.Record{
Key: []byte(tenant),
Value: eventBytes,
})
}
indexes := readAllSectionPointers(t, bucket, indexPrefix)
require.Equal(t, 30, len(indexes))
}
func readAllSectionPointers(t *testing.T, bucket objstore.Bucket, indexPrefix string) []pointers.SectionPointer {
var out []pointers.SectionPointer
directories := []string{}
err := bucket.Iter(context.Background(), fmt.Sprintf("%s/tenant-test-tenant/indexes/", indexPrefix), func(name string) error {
directories = append(directories, name)
return nil
})
require.NoError(t, err)
for _, directory := range directories {
err := bucket.Iter(context.Background(), directory, func(name string) error {
objReader, err := bucket.Get(context.Background(), name)
require.NoError(t, err)
defer objReader.Close()
objectBytes, err := io.ReadAll(objReader)
require.NoError(t, err)
object, err := dataobj.FromReaderAt(bytes.NewReader(objectBytes), int64(len(objectBytes)))
require.NoError(t, err)
var reader pointers.RowReader
defer reader.Close()
buf := make([]pointers.SectionPointer, 64)
for _, section := range object.Sections() {
if !pointers.CheckSection(section) {
continue
}
sec, err := pointers.Open(context.Background(), section)
if err != nil {
return fmt.Errorf("opening section: %w", err)
}
reader.Reset(sec)
for {
num, err := reader.Read(context.Background(), buf)
if err != nil && err != io.EOF {
return fmt.Errorf("reading section: %w", err)
}
if num == 0 && err == io.EOF {
break
}
out = append(out, buf[:num]...)
}
}
return nil
})
require.NoError(t, err)
}
require.NoError(t, err)
return out
}

@ -0,0 +1,420 @@
// Package indexobj provides tooling for creating index-oriented data objects.
package indexobj
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"sort"
"time"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)
// ErrBuilderFull is returned by [Builder.Append] when the buffer is
// full and needs to flush; call [Builder.Flush] to flush it.
var (
ErrBuilderFull = errors.New("builder full")
ErrBuilderEmpty = errors.New("builder empty")
)
// BuilderConfig configures a [Builder].
type BuilderConfig struct {
// TargetPageSize configures a target size for encoded pages within the data
// object. TargetPageSize accounts for encoding, but not for compression.
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
// TODO(rfratto): We need an additional parameter for TargetMetadataSize, as
// metadata payloads can't be split and must be downloaded in a single
// request.
//
// At the moment, we don't have a good mechanism for implementing a metadata
// size limit (we need to support some form of section splitting or column
// combinations), so the option is omitted for now.
// TargetObjectSize configures a target size for data objects.
TargetObjectSize flagext.Bytes `yaml:"target_object_size"`
// TargetSectionSize configures the maximum size of data in a section. Sections
// which support this parameter will place overflow data into new sections of
// the same type.
TargetSectionSize flagext.Bytes `yaml:"target_section_size"`
// BufferSize configures the size of the buffer used to accumulate
// uncompressed logs in memory prior to sorting.
BufferSize flagext.Bytes `yaml:"buffer_size"`
// SectionStripeMergeLimit configures the number of stripes to merge at once when
// flushing stripes into a section. MergeSize must be larger than 1. Lower
// values of MergeSize trade off lower memory overhead for higher time spent
// merging.
SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"`
}
// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
_ = cfg.TargetPageSize.Set("128KB")
_ = cfg.TargetObjectSize.Set("64MB")
_ = cfg.BufferSize.Set("2MB")
_ = cfg.TargetSectionSize.Set("16MB")
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.")
f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.")
f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of stripes to merge into a section at once. Must be greater than 1.")
}
// Validate validates the BuilderConfig.
func (cfg *BuilderConfig) Validate() error {
var errs []error
if cfg.TargetPageSize <= 0 {
errs = append(errs, errors.New("TargetPageSize must be greater than 0"))
} else if cfg.TargetPageSize >= cfg.TargetObjectSize {
errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize"))
}
if cfg.TargetObjectSize <= 0 {
errs = append(errs, errors.New("TargetObjectSize must be greater than 0"))
}
if cfg.BufferSize <= 0 {
errs = append(errs, errors.New("BufferSize must be greater than 0"))
}
if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize {
errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize"))
}
if cfg.SectionStripeMergeLimit < 2 {
errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1"))
}
return errors.Join(errs...)
}
// A Builder constructs a logs-oriented data object from a set of incoming
// log data. Log data is appended by calling [LogBuilder.Append]. A complete
// data object is constructed by by calling [LogBuilder.Flush].
//
// Methods on Builder are not goroutine-safe; callers are responsible for
// synchronization.
type Builder struct {
cfg BuilderConfig
metrics *builderMetrics
labelCache *lru.Cache[string, labels.Labels]
currentSizeEstimate int
builder *dataobj.Builder // Inner builder for accumulating sections.
streams *streams.Builder
pointers *pointers.Builder
state builderState
}
type builderState int
const (
// builderStateEmpty indicates the builder is empty and ready to accept new data.
builderStateEmpty builderState = iota
// builderStateDirty indicates the builder has been modified since the last flush.
builderStateDirty
)
// NewBuilder creates a new [Builder] which stores log-oriented data objects.
//
// NewBuilder returns an error if the provided config is invalid.
func NewBuilder(cfg BuilderConfig) (*Builder, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
labelCache, err := lru.New[string, labels.Labels](5000)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
metrics := newBuilderMetrics()
metrics.ObserveConfig(cfg)
return &Builder{
cfg: cfg,
metrics: metrics,
labelCache: labelCache,
builder: dataobj.NewBuilder(),
streams: streams.NewBuilder(metrics.streams, int(cfg.TargetPageSize)),
pointers: pointers.NewBuilder(metrics.pointers, int(cfg.TargetPageSize)),
}, nil
}
func (b *Builder) GetEstimatedSize() int {
return b.currentSizeEstimate
}
// AppendStream appends a stream to the object's stream section, returning the stream ID within this object.
func (b *Builder) AppendStream(stream streams.Stream) (int64, error) {
b.metrics.appendsTotal.Inc()
newEntrySize := labelsEstimate(stream.Labels) + 2
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return 0, ErrBuilderFull
}
timer := prometheus.NewTimer(b.metrics.appendTime)
defer timer.ObserveDuration()
// Record the stream in the stream section.
// Once to capture the min timestamp and uncompressed size, again to record the max timestamp.
sort.Sort(stream.Labels)
streamID := b.streams.Record(stream.Labels, stream.MinTimestamp, stream.UncompressedSize)
_ = b.streams.Record(stream.Labels, stream.MaxTimestamp, 0)
// If our logs section has gotten big enough, we want to flush it to the
// encoder and start a new section.
if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) {
if err := b.builder.Append(b.pointers); err != nil {
b.metrics.appendFailures.Inc()
return 0, err
}
}
b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
return streamID, nil
}
// labelsEstimate estimates the size of a set of labels in bytes.
func labelsEstimate(ls labels.Labels) int {
var (
keysSize int
valuesSize int
)
for _, l := range ls {
keysSize += len(l.Name)
valuesSize += len(l.Value)
}
// Keys are stored as columns directly, while values get compressed. We'll
// underestimate a 2x compression ratio.
return keysSize + valuesSize/2
}
// RecordStreamRef records a reference to a stream from another object, as the stream IDs will be different between objects.
func (b *Builder) RecordStreamRef(path string, streamIDInObject int64, streamID int64) {
b.pointers.RecordStreamRef(path, streamIDInObject, streamID)
}
// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBuilderFull] if the
// builder is full.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) ObserveLogLine(path string, section int64, streamIDInObject int64, ts time.Time, uncompressedSize int64) error {
// Check whether the buffer is full before a stream can be appended; this is
// tends to overestimate, but we may still go over our target size.
//
// Since this check only happens after the first call to Append,
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
newEntrySize := 4 // ints and times compress well so we just need to make an estimate.
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return ErrBuilderFull
}
timer := prometheus.NewTimer(b.metrics.appendTime)
defer timer.ObserveDuration()
b.pointers.ObserveStream(path, section, streamIDInObject, ts, uncompressedSize)
// If our logs section has gotten big enough, we want to flush it to the
// encoder and start a new section.
if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) {
if err := b.builder.Append(b.pointers); err != nil {
return err
}
}
b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
return nil
}
// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBuilderFull] if the
// builder is full.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) AppendColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloom []byte) error {
// Check whether the buffer is full before a stream can be appended; this is
// tends to overestimate, but we may still go over our target size.
//
// Since this check only happens after the first call to Append,
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
newEntrySize := len(columnName) + 1 + 1 + len(valuesBloom) + 1
if b.state != builderStateEmpty && b.currentSizeEstimate+newEntrySize > int(b.cfg.TargetObjectSize) {
return ErrBuilderFull
}
timer := prometheus.NewTimer(b.metrics.appendTime)
defer timer.ObserveDuration()
b.pointers.RecordColumnIndex(path, section, columnName, columnIndex, valuesBloom)
// If our logs section has gotten big enough, we want to flush it to the
// encoder and start a new section.
if b.pointers.EstimatedSize() > int(b.cfg.TargetSectionSize) {
if err := b.builder.Append(b.pointers); err != nil {
return err
}
}
b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
return nil
}
func (b *Builder) estimatedSize() int {
var size int
size += b.streams.EstimatedSize()
size += b.pointers.EstimatedSize()
size += b.builder.Bytes()
b.metrics.sizeEstimate.Set(float64(size))
return size
}
type FlushStats struct {
MinTimestamp time.Time
MaxTimestamp time.Time
}
// Flush flushes all buffered data to the buffer provided. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// [Builder.Reset] is called after a successful Flush to discard any pending data and allow new data to be appended.
func (b *Builder) Flush(output *bytes.Buffer) (FlushStats, error) {
if b.state == builderStateEmpty {
return FlushStats{}, ErrBuilderEmpty
}
b.metrics.flushTotal.Inc()
timer := prometheus.NewTimer(b.metrics.buildTime)
defer timer.ObserveDuration()
// Appending sections resets them, so we need to load the time range before
// appending.
minTime, maxTime := b.streams.TimeRange()
// Flush sections one more time in case they have data.
var flushErrors []error
flushErrors = append(flushErrors, b.builder.Append(b.streams))
flushErrors = append(flushErrors, b.builder.Append(b.pointers))
if err := errors.Join(flushErrors...); err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("building object: %w", err)
}
sz, err := b.builder.Flush(output)
if err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("building object: %w", err)
}
b.metrics.builtSize.Observe(float64(sz))
var (
// We don't know if output was empty before calling Flush, so we only start
// reading from where we know writing began.
objReader = bytes.NewReader(output.Bytes()[output.Len()-int(sz):])
objLength = sz
)
obj, err := dataobj.FromReaderAt(objReader, objLength)
if err != nil {
b.metrics.flushFailures.Inc()
return FlushStats{}, fmt.Errorf("failed to create readable object: %w", err)
}
err = b.observeObject(context.Background(), obj)
b.Reset()
return FlushStats{MinTimestamp: minTime, MaxTimestamp: maxTime}, err
}
func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error {
var errs []error
errs = append(errs, b.metrics.dataobj.Observe(obj))
for _, sec := range obj.Sections() {
switch {
case pointers.CheckSection(sec):
pointerSection, err := pointers.Open(context.Background(), sec)
if err != nil {
errs = append(errs, err)
continue
}
errs = append(errs, b.metrics.pointers.Observe(ctx, pointerSection))
case streams.CheckSection(sec):
streamSection, err := streams.Open(context.Background(), sec)
if err != nil {
errs = append(errs, err)
continue
}
errs = append(errs, b.metrics.streams.Observe(ctx, streamSection))
}
}
return errors.Join(errs...)
}
// Reset discards pending data and resets the builder to an empty state.
func (b *Builder) Reset() {
b.builder.Reset()
b.streams.Reset()
b.pointers.Reset()
//b.metrics.sizeEstimate.Set(0)
b.currentSizeEstimate = 0
b.state = builderStateEmpty
}
// RegisterMetrics registers metrics about builder to report to reg. All
// metrics will have a tenant label set to the tenant ID of the Builder.
//
// If multiple Builders for the same tenant are running in the same process,
// reg must contain additional labels to differentiate between them.
func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error {
return b.metrics.Register(reg)
}
// UnregisterMetrics unregisters metrics about builder from reg.
func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) {
b.metrics.Unregister(reg)
}

@ -0,0 +1,165 @@
package indexobj
import (
"errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)
// builderMetrics provides instrumnetation for a [Builder].
type builderMetrics struct {
pointers *pointers.Metrics
streams *streams.Metrics
dataobj *dataobj.Metrics
targetPageSize prometheus.Gauge
targetObjectSize prometheus.Gauge
appendTime prometheus.Histogram
appendFailures prometheus.Counter
appendsTotal prometheus.Counter
buildTime prometheus.Histogram
flushFailures prometheus.Counter
flushTotal prometheus.Counter
sizeEstimate prometheus.Gauge
builtSize prometheus.Histogram
}
// newBuilderMetrics creates a new set of [builderMetrics] for instrumenting
// logs objects.
func newBuilderMetrics() *builderMetrics {
return &builderMetrics{
pointers: pointers.NewMetrics(),
streams: streams.NewMetrics(),
dataobj: dataobj.NewMetrics(),
targetPageSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_indexobj_config_target_page_size_bytes",
Help: "Configured target page size in bytes.",
}),
targetObjectSize: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_indexobj_config_target_object_size_bytes",
Help: "Configured target object size in bytes.",
}),
appendTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_indexobj_append_time_seconds",
Help: "Time taken appending a set of log lines in a stream to a data object.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
appendFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_indexobj_append_failures_total",
Help: "Total number of append failures",
}),
appendsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_indexobj_appends_total",
Help: "Total number of appends",
}),
buildTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_indexobj_build_time_seconds",
Help: "Time taken building a data object to flush.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
sizeEstimate: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "loki_indexobj_size_estimate_bytes",
Help: "Current estimated size of the data object in bytes.",
}),
builtSize: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_indexobj_built_size_bytes",
Help: "Distribution of constructed data object sizes in bytes.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
flushFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_indexobj_flush_failures_total",
Help: "Total number of flush failures.",
}),
flushTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_indexobj_flush_total",
Help: "Total number of flushes.",
}),
}
}
// ObserveConfig updates config metrics based on the provided [BuilderConfig].
func (m *builderMetrics) ObserveConfig(cfg BuilderConfig) {
m.targetPageSize.Set(float64(cfg.TargetPageSize))
m.targetObjectSize.Set(float64(cfg.TargetObjectSize))
}
// Register registers metrics to report to reg.
func (m *builderMetrics) Register(reg prometheus.Registerer) error {
var errs []error
errs = append(errs, m.pointers.Register(reg))
errs = append(errs, m.streams.Register(reg))
errs = append(errs, m.dataobj.Register(reg))
errs = append(errs, reg.Register(m.targetPageSize))
errs = append(errs, reg.Register(m.targetObjectSize))
errs = append(errs, reg.Register(m.appendTime))
errs = append(errs, reg.Register(m.appendFailures))
errs = append(errs, reg.Register(m.appendsTotal))
errs = append(errs, reg.Register(m.buildTime))
errs = append(errs, reg.Register(m.sizeEstimate))
errs = append(errs, reg.Register(m.builtSize))
errs = append(errs, reg.Register(m.flushFailures))
errs = append(errs, reg.Register(m.flushTotal))
return errors.Join(errs...)
}
// Unregister unregisters metrics from the provided Registerer.
func (m *builderMetrics) Unregister(reg prometheus.Registerer) {
m.pointers.Unregister(reg)
m.streams.Unregister(reg)
m.dataobj.Unregister(reg)
reg.Unregister(m.targetPageSize)
reg.Unregister(m.targetObjectSize)
reg.Unregister(m.appendTime)
reg.Unregister(m.appendFailures)
reg.Unregister(m.appendsTotal)
reg.Unregister(m.buildTime)
reg.Unregister(m.sizeEstimate)
reg.Unregister(m.builtSize)
reg.Unregister(m.flushFailures)
reg.Unregister(m.flushTotal)
}

@ -0,0 +1,151 @@
package indexobj
import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)
var testBuilderConfig = BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 21, // 2 MiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
}
func TestBuilder(t *testing.T) {
buf := bytes.NewBuffer(nil)
dirtyBuf := bytes.NewBuffer([]byte("dirty"))
testStreams := []streams.Stream{
{
ID: 1,
Labels: labels.Labels{
{Name: "cluster", Value: "test"},
{Name: "app", Value: "foo"},
},
Rows: 2,
MinTimestamp: time.Unix(10, 0).UTC(),
MaxTimestamp: time.Unix(20, 0).UTC(),
UncompressedSize: 200,
},
{
ID: 2,
Labels: labels.Labels{
{Name: "cluster", Value: "test"},
{Name: "app", Value: "bar"},
},
Rows: 3,
MinTimestamp: time.Unix(15, 0).UTC(),
MaxTimestamp: time.Unix(25, 0).UTC(),
UncompressedSize: 100,
},
}
testPointers := []pointers.SectionPointer{
{
Path: "test/path",
Section: 1,
ColumnName: "foo",
ColumnIndex: 1,
ValuesBloomFilter: []byte{1, 2, 3},
},
}
t.Run("Build", func(t *testing.T) {
builder, err := NewBuilder(testBuilderConfig)
require.NoError(t, err)
for _, stream := range testStreams {
_, err := builder.AppendStream(stream)
require.NoError(t, err)
}
for _, pointer := range testPointers {
err := builder.AppendColumnIndex(pointer.Path, pointer.Section, pointer.ColumnName, pointer.ColumnIndex, pointer.ValuesBloomFilter)
require.NoError(t, err)
}
_, err = builder.Flush(buf)
require.NoError(t, err)
})
t.Run("Read", func(t *testing.T) {
obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
require.NoError(t, err)
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection))
require.Equal(t, 0, obj.Sections().Count(logs.CheckSection))
})
t.Run("BuildWithDirtyBuffer", func(t *testing.T) {
builder, err := NewBuilder(testBuilderConfig)
require.NoError(t, err)
for _, stream := range testStreams {
_, err := builder.AppendStream(stream)
require.NoError(t, err)
}
for _, pointer := range testPointers {
err := builder.AppendColumnIndex(pointer.Path, pointer.Section, pointer.ColumnName, pointer.ColumnIndex, pointer.ValuesBloomFilter)
require.NoError(t, err)
}
_, err = builder.Flush(dirtyBuf)
require.NoError(t, err)
require.Equal(t, buf.Len(), dirtyBuf.Len()-5)
})
t.Run("ReadFromDirtyBuffer", func(t *testing.T) {
obj, err := dataobj.FromReaderAt(bytes.NewReader(dirtyBuf.Bytes()[5:]), int64(dirtyBuf.Len()-5))
require.NoError(t, err)
require.Equal(t, 1, obj.Sections().Count(streams.CheckSection))
require.Equal(t, 1, obj.Sections().Count(pointers.CheckSection))
require.Equal(t, 0, obj.Sections().Count(logs.CheckSection))
})
}
// TestBuilder_Append ensures that appending to the buffer eventually reports
// that the buffer is full.
func TestBuilder_Append(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
builder, err := NewBuilder(testBuilderConfig)
require.NoError(t, err)
i := 0
for {
require.NoError(t, ctx.Err())
_, err := builder.AppendStream(streams.Stream{
ID: 1,
Labels: labels.Labels{
{Name: "cluster", Value: "test"},
{Name: "app", Value: "foo"},
{Name: "i", Value: fmt.Sprintf("%d", i)},
},
Rows: 2,
MinTimestamp: time.Unix(10, 0).UTC(),
MaxTimestamp: time.Unix(20, 0).UTC(),
})
if errors.Is(err, ErrBuilderFull) {
break
}
require.NoError(t, err)
i++
}
}

@ -0,0 +1,85 @@
package index
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type indexBuilderMetrics struct {
// Error counters
commitFailures prometheus.Counter
// Request counters
commitsTotal prometheus.Counter
// Processing delay histogram
processingDelay prometheus.Histogram
}
func newIndexBuilderMetrics() *indexBuilderMetrics {
p := &indexBuilderMetrics{
commitFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_index_builder_commit_failures_total",
Help: "Total number of commit failures",
}),
commitsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_index_builder_commits_total",
Help: "Total number of commits",
}),
processingDelay: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_index_builder_processing_delay_seconds",
Help: "Time difference between record timestamp and processing time in seconds",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
}
return p
}
func (p *indexBuilderMetrics) register(reg prometheus.Registerer) error {
collectors := []prometheus.Collector{
p.commitFailures,
p.commitsTotal,
p.processingDelay,
}
for _, collector := range collectors {
if err := reg.Register(collector); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
return err
}
}
}
return nil
}
func (p *indexBuilderMetrics) unregister(reg prometheus.Registerer) {
collectors := []prometheus.Collector{
p.commitFailures,
p.commitsTotal,
p.processingDelay,
}
for _, collector := range collectors {
reg.Unregister(collector)
}
}
func (p *indexBuilderMetrics) incCommitFailures() {
p.commitFailures.Inc()
}
func (p *indexBuilderMetrics) incCommitsTotal() {
p.commitsTotal.Inc()
}
func (p *indexBuilderMetrics) observeProcessingDelay(recordTimestamp time.Time) {
// Convert milliseconds to seconds and calculate delay
if !recordTimestamp.IsZero() { // Only observe if timestamp is valid
p.processingDelay.Observe(time.Since(recordTimestamp).Seconds())
}
}

@ -69,6 +69,13 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
if len(s) == 0 {
return 0, nil
}
// Init stats object and use the context, otherwise we create a new one every time we increment a stat.
var statistics *stats.Context
if stats.IsPresent(ctx) {
statistics = stats.FromContext(ctx)
} else {
statistics, ctx = stats.NewContext(ctx)
}
if !r.ready {
err := r.init(ctx)
@ -127,9 +134,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
r.dl.SetReadRange(readRange)
var (
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // tracks how many rows passed the predicate
statistics = stats.FromContext(ctx)
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // tracks how many rows passed the predicate
)
// If there are no predicates, read all columns in the dataset

@ -118,6 +118,10 @@ func convertColumnType(protoType logsmd.ColumnType) (ColumnType, bool) {
return ColumnTypeInvalid, false
}
func IsMetadataColumn(colType string) bool {
return colType == logsmd.COLUMN_TYPE_METADATA.String()
}
var columnTypeNames = map[ColumnType]string{
ColumnTypeInvalid: "invalid",
ColumnTypeStreamID: "stream_id",

@ -29,6 +29,7 @@ type (
MetadataSize uint64
ValuesCount uint64
Cardinality uint64
ColumnIndex int64
Pages []PageStats
}
@ -79,6 +80,7 @@ func ReadStats(ctx context.Context, section *Section) (Stats, error) {
MetadataSize: col.Info.MetadataSize,
ValuesCount: col.Info.ValuesCount,
Cardinality: col.Info.Statistics.GetCardinalityCount(),
ColumnIndex: int64(i),
}
for _, pages := range pageSets[i] {

@ -52,17 +52,25 @@ const (
PointerKindColumnIndex // PointerKindColumnIndex is a pointer for a column index.
)
type streamKey struct {
objectPath string
section int64
streamID int64
}
// Builder builds a pointers section.
type Builder struct {
metrics *Metrics
pageSize int
// streamLookup is a map of the stream ID in this index object to the pointer.
streamLookup map[string]*SectionPointer
streamLookup map[streamKey]*SectionPointer
// streamObjectRefs is a map of the stream ID in the referenced logs object to the stream ID in this index object.
streamObjectRefs map[string]map[int64]int64
// pointers is the list of pointers to encode.
pointers []*SectionPointer
key streamKey
}
// NewBuilder creates a new pointers section builder. The pageSize argument
@ -75,7 +83,7 @@ func NewBuilder(metrics *Metrics, pageSize int) *Builder {
metrics: metrics,
pageSize: pageSize,
streamLookup: make(map[string]*SectionPointer),
streamLookup: make(map[streamKey]*SectionPointer),
streamObjectRefs: make(map[string]map[int64]int64),
pointers: make([]*SectionPointer, 0, 1024),
}
@ -97,8 +105,12 @@ func (b *Builder) RecordStreamRef(path string, idInObject int64, idInIndex int64
// ObserveStream observes a stream in the index by recording the start & end timestamps, line count, and uncompressed size per-section.
func (b *Builder) ObserveStream(path string, section int64, idInObject int64, ts time.Time, uncompressedSize int64) {
indexStreamID := b.streamObjectRefs[path][idInObject]
key := fmt.Sprintf("%s:%d:%d", path, section, indexStreamID)
pointer, ok := b.streamLookup[key]
b.key.objectPath = path
b.key.section = section
b.key.streamID = indexStreamID
pointer, ok := b.streamLookup[b.key]
if ok {
// Update the existing pointer
if ts.Before(pointer.StartTs) {
@ -124,7 +136,7 @@ func (b *Builder) ObserveStream(path string, section int64, idInObject int64, ts
UncompressedSize: uncompressedSize,
}
b.pointers = append(b.pointers, newPointer)
b.streamLookup[key] = newPointer
b.streamLookup[b.key] = newPointer
}
func (b *Builder) RecordColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloomFilter []byte) {

@ -19,7 +19,7 @@ func newDecoder(reader dataobj.SectionReader) *decoder {
return &decoder{sr: reader}
}
// decoder supports decoding the raw underlying data for a streams section.
// decoder supports decoding the raw underlying data for a pointers section.
type decoder struct {
sr dataobj.SectionReader
}
@ -28,7 +28,7 @@ type decoder struct {
func (rd *decoder) Columns(ctx context.Context) ([]*pointersmd.ColumnDesc, error) {
rc, err := rd.sr.Metadata(ctx)
if err != nil {
return nil, fmt.Errorf("reading streams section metadata: %w", err)
return nil, fmt.Errorf("reading pointers section metadata: %w", err)
}
defer rc.Close()

@ -78,6 +78,11 @@ func NewContext(ctx context.Context) (*Context, context.Context) {
return contextData, ctx
}
func IsPresent(ctx context.Context) bool {
_, ok := ctx.Value(statsKey).(*Context)
return ok
}
// FromContext returns the statistics context.
func FromContext(ctx context.Context) *Context {
v, ok := ctx.Value(statsKey).(*Context)

@ -39,6 +39,7 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/deletion"
dataobjconfig "github.com/grafana/loki/v3/pkg/dataobj/config"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/ingester"
@ -427,6 +428,7 @@ type Loki struct {
blockBuilder *blockbuilder.BlockBuilder
blockScheduler *blockscheduler.BlockScheduler
dataObjConsumer *consumer.Service
dataObjIndexBuilder *dataobjindex.Builder
ClientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
@ -768,6 +770,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(DataObjExplorer, t.initDataObjExplorer)
mm.RegisterModule(UI, t.initUI)
mm.RegisterModule(DataObjConsumer, t.initDataObjConsumer)
mm.RegisterModule(DataObjIndexBuilder, t.initDataObjIndexBuilder)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
@ -814,6 +817,7 @@ func (t *Loki) setupModuleManager() error {
BlockScheduler: {Server, UI},
DataObjExplorer: {Server, UI},
DataObjConsumer: {PartitionRing, Server, UI},
DataObjIndexBuilder: {Server, UI},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},

@ -54,6 +54,7 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/generationnumber"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/explorer"
dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
dataobjquerier "github.com/grafana/loki/v3/pkg/dataobj/querier"
"github.com/grafana/loki/v3/pkg/distributor"
@ -158,6 +159,7 @@ const (
BlockScheduler = "block-scheduler"
DataObjExplorer = "dataobj-explorer"
DataObjConsumer = "dataobj-consumer"
DataObjIndexBuilder = "dataobj-index-builder"
UI = "ui"
All = "all"
Read = "read"
@ -2164,6 +2166,28 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) {
return t.dataObjConsumer, nil
}
func (t *Loki) initDataObjIndexBuilder() (services.Service, error) {
if !t.Cfg.Ingester.KafkaIngestion.Enabled {
return nil, nil
}
store, err := t.createDataObjBucket("dataobj-index-builder")
if err != nil {
return nil, err
}
level.Info(util_log.Logger).Log("msg", "initializing dataobj index builder", "instance", t.Cfg.Ingester.LifecyclerConfig.ID)
t.dataObjIndexBuilder, err = dataobjindex.NewIndexBuilder(
t.Cfg.DataObj.Index,
t.Cfg.KafkaConfig,
util_log.Logger,
t.Cfg.Ingester.LifecyclerConfig.ID,
store,
prometheus.DefaultRegisterer,
)
return t.dataObjIndexBuilder, err
}
func (t *Loki) createDataObjBucket(clientName string) (objstore.Bucket, error) {
schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now())
if err != nil {

Loading…
Cancel
Save