From ca4c025ad00a00292608f8636a636d38e282f3ae Mon Sep 17 00:00:00 2001 From: benclive Date: Mon, 3 Feb 2025 10:59:39 +0000 Subject: [PATCH] chore(dataobj): Create initial dataobj builder (#16011) Co-authored-by: Cyril Tovena Co-authored-by: Robert Fratto --- docs/sources/shared/configuration.md | 30 +++ go.mod | 2 + go.sum | 4 +- pkg/dataobj/builder.go | 46 +++- pkg/dataobj/builder_test.go | 3 +- pkg/dataobj/consumer/config.go | 32 +++ pkg/dataobj/consumer/metrics.go | 117 ++++++++++ pkg/dataobj/consumer/partition_processor.go | 201 ++++++++++++++++ pkg/dataobj/consumer/service.go | 218 ++++++++++++++++++ pkg/dataobj/internal/sections/logs/table.go | 1 - .../internal/sections/streams/streams.go | 44 +++- pkg/dataobj/metastore/metastore.go | 175 ++++++++++++++ pkg/dataobj/metastore/metastore_test.go | 106 +++++++++ pkg/dataobj/metastore/metrics.go | 102 ++++++++ pkg/loki/loki.go | 10 + pkg/loki/modules.go | 38 ++- pkg/storage/bucket/prefixed_bucket_client.go | 5 + pkg/storage/bucket/sse_bucket_client.go | 4 + vendor/github.com/thanos-io/objstore/inmem.go | 26 +++ .../github.com/thanos-io/objstore/objstore.go | 8 + .../thanos-io/objstore/prefixed_bucket.go | 4 + .../objstore/providers/azure/azure.go | 4 + .../thanos-io/objstore/providers/bos/bos.go | 4 + .../providers/filesystem/filesystem.go | 36 +++ .../thanos-io/objstore/providers/gcs/gcs.go | 60 ++++- .../thanos-io/objstore/providers/oss/oss.go | 4 + .../thanos-io/objstore/providers/s3/s3.go | 95 ++++++-- .../objstore/providers/swift/swift.go | 4 + .../github.com/thanos-io/objstore/testing.go | 4 + .../tracing/opentracing/opentracing.go | 8 + vendor/modules.txt | 3 +- 31 files changed, 1352 insertions(+), 46 deletions(-) create mode 100644 pkg/dataobj/consumer/config.go create mode 100644 pkg/dataobj/consumer/metrics.go create mode 100644 pkg/dataobj/consumer/partition_processor.go create mode 100644 pkg/dataobj/consumer/service.go create mode 100644 pkg/dataobj/metastore/metastore.go create mode 100644 pkg/dataobj/metastore/metastore_test.go create mode 100644 pkg/dataobj/metastore/metrics.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 503809c69c..2154346786 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -895,6 +895,36 @@ kafka_config: # CLI flag: -kafka.max-consumer-lag-at-startup [max_consumer_lag_at_startup: | default = 15s] +dataobj_consumer: + builderconfig: + # The size of the SHA prefix to use for the data object builder. + # CLI flag: -dataobj-consumer.sha-prefix-size + [sha_prefix_size: | default = 2] + + # The size of the target page to use for the data object builder. + # CLI flag: -dataobj-consumer.target-page-size + [target_page_size: | default = 2MiB] + + # The size of the target object to use for the data object builder. + # CLI flag: -dataobj-consumer.target-object-size + [target_object_size: | default = 1GiB] + + # Configures a maximum size for sections, for sections that support it. + # CLI flag: -dataobj-consumer.target-section-size + [target_section_size: | default = 128MiB] + + # The size of the buffer to use for sorting logs. + # CLI flag: -dataobj-consumer.buffer-size + [buffer_size: | default = 16MiB] + + # The tenant ID to use for the data object builder. + # CLI flag: -dataobj-consumer.tenant-id + [tenant_id: | default = "fake"] + + # The prefix to use for the storage bucket. + # CLI flag: -dataobj-consumer.storage-bucket-prefix + [storage_bucket_prefix: | default = "dataobj/"] + dataobj_explorer: # Prefix to use when exploring the bucket. If set, only objects under this # prefix will be visible. diff --git a/go.mod b/go.mod index 25717cd1c4..9d725c11bc 100644 --- a/go.mod +++ b/go.mod @@ -408,3 +408,5 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push // leodido fork his project to continue support replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0 + +replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 diff --git a/go.sum b/go.sum index bbee7f7ddb..65f7198d16 100644 --- a/go.sum +++ b/go.sum @@ -628,6 +628,8 @@ github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675 h1:U94jQ2TQr1m3 github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 h1:/y3qC0I9kttHjLPxp4bGf+4jcJw60C6hrokTPckHYT8= +github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= @@ -1124,8 +1126,6 @@ github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08Yu github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM= github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw= -github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a h1:wFBHAmtq1tOLPFaiC4LozyG/BzkRa3ZTmVv1KujUNqk= -github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index ba0391b929..a80591547b 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -8,6 +8,7 @@ import ( "errors" "flag" "fmt" + "time" "github.com/grafana/dskit/flagext" lru "github.com/hashicorp/golang-lru/v2" @@ -126,6 +127,12 @@ type Builder struct { type builderState int +type FlushResult struct { + Path string + MinTimestamp time.Time + MaxTimestamp time.Time +} + const ( // builderStateReady indicates the builder is empty and ready to accept new data. builderStateEmpty builderState = iota @@ -285,15 +292,10 @@ func streamSizeEstimate(stream logproto.Stream) int { // If Flush builds an object but fails to upload it to object storage, the // built object is cached and can be retried. [Builder.Reset] can be called to // discard any pending data and allow new data to be appended. -func (b *Builder) Flush(ctx context.Context) error { - switch b.state { - case builderStateEmpty: - return nil // Nothing to flush - case builderStateDirty: - if err := b.buildObject(); err != nil { - return fmt.Errorf("building object: %w", err) - } - b.state = builderStateFlush +func (b *Builder) Flush(ctx context.Context) (FlushResult, error) { + buf, err := b.FlushToBuffer() + if err != nil { + return FlushResult{}, fmt.Errorf("flushing buffer: %w", err) } timer := prometheus.NewTimer(b.metrics.flushTime) @@ -303,12 +305,32 @@ func (b *Builder) Flush(ctx context.Context) error { sumStr := hex.EncodeToString(sum[:]) objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:]) - if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil { - return err + if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(buf.Bytes())); err != nil { + return FlushResult{}, fmt.Errorf("uploading object: %w", err) } + minTime, maxTime := b.streams.GetBounds() + b.Reset() - return nil + return FlushResult{ + Path: objectPath, + MinTimestamp: minTime, + MaxTimestamp: maxTime, + }, nil +} + +func (b *Builder) FlushToBuffer() (*bytes.Buffer, error) { + switch b.state { + case builderStateEmpty: + return nil, nil // Nothing to flush + case builderStateDirty: + if err := b.buildObject(); err != nil { + return nil, fmt.Errorf("building object: %w", err) + } + b.state = builderStateFlush + } + + return b.flushBuffer, nil } func (b *Builder) buildObject() error { diff --git a/pkg/dataobj/builder_test.go b/pkg/dataobj/builder_test.go index 365f6a5d61..ff86e8dcb9 100644 --- a/pkg/dataobj/builder_test.go +++ b/pkg/dataobj/builder_test.go @@ -81,7 +81,8 @@ func TestBuilder(t *testing.T) { for _, entry := range streams { require.NoError(t, builder.Append(entry)) } - require.NoError(t, builder.Flush(context.Background())) + _, err = builder.Flush(context.Background()) + require.NoError(t, err) }) t.Run("Read", func(t *testing.T) { diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go new file mode 100644 index 0000000000..c873eb5990 --- /dev/null +++ b/pkg/dataobj/consumer/config.go @@ -0,0 +1,32 @@ +package consumer + +import ( + "errors" + "flag" + + "github.com/grafana/loki/v3/pkg/dataobj" +) + +type Config struct { + dataobj.BuilderConfig + TenantID string `yaml:"tenant_id"` + // StorageBucketPrefix is the prefix to use for the storage bucket. + StorageBucketPrefix string `yaml:"storage_bucket_prefix"` +} + +func (cfg *Config) Validate() error { + if cfg.TenantID == "" { + return errors.New("tenantID is required") + } + return cfg.BuilderConfig.Validate() +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("dataobj-consumer.", f) +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + f.StringVar(&cfg.TenantID, prefix+"tenant-id", "fake", "The tenant ID to use for the data object builder.") + f.StringVar(&cfg.StorageBucketPrefix, prefix+"storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.") +} diff --git a/pkg/dataobj/consumer/metrics.go b/pkg/dataobj/consumer/metrics.go new file mode 100644 index 0000000000..4525cb512d --- /dev/null +++ b/pkg/dataobj/consumer/metrics.go @@ -0,0 +1,117 @@ +package consumer + +import ( + "time" + + "go.uber.org/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +type partitionOffsetMetrics struct { + currentOffset prometheus.GaugeFunc + lastOffset atomic.Int64 + + // Error counters + flushFailures prometheus.Counter + commitFailures prometheus.Counter + appendFailures prometheus.Counter + + // Processing delay histogram + processingDelay prometheus.Histogram +} + +func newPartitionOffsetMetrics() *partitionOffsetMetrics { + p := &partitionOffsetMetrics{ + flushFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_flush_failures_total", + Help: "Total number of flush failures", + }), + commitFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_commit_failures_total", + Help: "Total number of commit failures", + }), + appendFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_append_failures_total", + Help: "Total number of append failures", + }), + processingDelay: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_dataobj_consumer_processing_delay_seconds", + Help: "Time difference between record timestamp and processing time in seconds", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + } + + p.currentOffset = prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "loki_dataobj_consumer_current_offset", + Help: "The last consumed offset for this partition", + }, + p.getCurrentOffset, + ) + + return p +} + +func (p *partitionOffsetMetrics) getCurrentOffset() float64 { + return float64(p.lastOffset.Load()) +} + +func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + p.currentOffset, + p.flushFailures, + p.commitFailures, + p.appendFailures, + p.processingDelay, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + return nil +} + +func (p *partitionOffsetMetrics) unregister(reg prometheus.Registerer) { + collectors := []prometheus.Collector{ + p.currentOffset, + p.flushFailures, + p.commitFailures, + p.appendFailures, + p.processingDelay, + } + + for _, collector := range collectors { + reg.Unregister(collector) + } +} + +func (p *partitionOffsetMetrics) updateOffset(offset int64) { + p.lastOffset.Store(offset) +} + +func (p *partitionOffsetMetrics) incFlushFailures() { + p.flushFailures.Inc() +} + +func (p *partitionOffsetMetrics) incCommitFailures() { + p.commitFailures.Inc() +} + +func (p *partitionOffsetMetrics) incAppendFailures() { + p.appendFailures.Inc() +} + +func (p *partitionOffsetMetrics) observeProcessingDelay(recordTimestamp time.Time) { + // Convert milliseconds to seconds and calculate delay + if !recordTimestamp.IsZero() { // Only observe if timestamp is valid + p.processingDelay.Observe(time.Since(recordTimestamp).Seconds()) + } +} diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go new file mode 100644 index 0000000000..0c5cdb1346 --- /dev/null +++ b/pkg/dataobj/consumer/partition_processor.go @@ -0,0 +1,201 @@ +package consumer + +import ( + "bytes" + "context" + "strconv" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/kafka" +) + +type partitionProcessor struct { + // Kafka client and topic/partition info + client *kgo.Client + topic string + partition int32 + tenantID []byte + // Processing pipeline + records chan *kgo.Record + builder *dataobj.Builder + decoder *kafka.Decoder + + // Builder initialization + builderOnce sync.Once + builderCfg dataobj.BuilderConfig + bucket objstore.Bucket + metastoreManager *metastore.Manager + // Metrics + metrics *partitionOffsetMetrics + + // Control and coordination + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + reg prometheus.Registerer + logger log.Logger +} + +func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, bucket objstore.Bucket, tenantID string, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor { + ctx, cancel := context.WithCancel(ctx) + decoder, err := kafka.NewDecoder() + if err != nil { + panic(err) + } + reg = prometheus.WrapRegistererWith(prometheus.Labels{ + "partition": strconv.Itoa(int(partition)), + }, reg) + + metrics := newPartitionOffsetMetrics() + if err := metrics.register(reg); err != nil { + level.Error(logger).Log("msg", "failed to register partition metrics", "err", err) + } + + metastoreManager, err := metastore.NewMetastoreManager(bucket, tenantID, logger, reg) + if err != nil { + level.Error(logger).Log("msg", "failed to create metastore manager", "err", err) + cancel() + return nil + } + + return &partitionProcessor{ + client: client, + logger: log.With(logger, "topic", topic, "partition", partition), + topic: topic, + partition: partition, + records: make(chan *kgo.Record, 1000), + ctx: ctx, + cancel: cancel, + decoder: decoder, + reg: reg, + builderCfg: builderCfg, + bucket: bucket, + tenantID: []byte(tenantID), + metrics: metrics, + metastoreManager: metastoreManager, + } +} + +func (p *partitionProcessor) start() { + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer close(p.records) + + level.Info(p.logger).Log("msg", "started partition processor") + for { + select { + case <-p.ctx.Done(): + level.Info(p.logger).Log("msg", "stopping partition processor") + return + case record := <-p.records: + p.processRecord(record) + } + } + }() +} + +func (p *partitionProcessor) stop() { + p.cancel() + p.wg.Wait() + if p.builder != nil { + p.builder.UnregisterMetrics(p.reg) + } + p.metrics.unregister(p.reg) +} + +func (p *partitionProcessor) initBuilder() error { + var initErr error + p.builderOnce.Do(func() { + builder, err := dataobj.NewBuilder(p.builderCfg, p.bucket, string(p.tenantID)) + if err != nil { + initErr = err + return + } + if err := builder.RegisterMetrics(p.reg); err != nil { + initErr = err + return + } + p.builder = builder + }) + return initErr +} + +func (p *partitionProcessor) processRecord(record *kgo.Record) { + // Update offset metric at the end of processing + defer p.metrics.updateOffset(record.Offset) + + // Observe processing delay + p.metrics.observeProcessingDelay(record.Timestamp) + + // Initialize builder if this is the first record + if err := p.initBuilder(); err != nil { + level.Error(p.logger).Log("msg", "failed to initialize builder", "err", err) + return + } + + // todo: handle multi-tenant + if !bytes.Equal(record.Key, p.tenantID) { + return + } + stream, err := p.decoder.DecodeWithoutLabels(record.Value) + if err != nil { + level.Error(p.logger).Log("msg", "failed to decode record", "err", err) + return + } + + if err := p.builder.Append(stream); err != nil { + if err != dataobj.ErrBufferFull { + level.Error(p.logger).Log("msg", "failed to append stream", "err", err) + p.metrics.incAppendFailures() + return + } + + backoff := backoff.New(p.ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + }) + + var flushResult dataobj.FlushResult + for backoff.Ongoing() { + flushResult, err = p.builder.Flush(p.ctx) + if err == nil { + break + } + level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) + p.metrics.incFlushFailures() + backoff.Wait() + } + + if err := p.metastoreManager.UpdateMetastore(p.ctx, flushResult); err != nil { + level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + return + } + + backoff.Reset() + for backoff.Ongoing() { + err = p.client.CommitRecords(p.ctx, record) + if err == nil { + break + } + level.Error(p.logger).Log("msg", "failed to commit records", "err", err) + p.metrics.incCommitFailures() + backoff.Wait() + } + + if err := p.builder.Append(stream); err != nil { + level.Error(p.logger).Log("msg", "failed to append stream after flushing", "err", err) + p.metrics.incAppendFailures() + } + } +} diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go new file mode 100644 index 0000000000..44529ca749 --- /dev/null +++ b/pkg/dataobj/consumer/service.go @@ -0,0 +1,218 @@ +package consumer + +import ( + "context" + "errors" + "strconv" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" + "github.com/grafana/loki/v3/pkg/kafka/partitionring/consumer" +) + +const ( + groupName = "dataobj-consumer" +) + +type Service struct { + services.Service + + logger log.Logger + reg prometheus.Registerer + client *consumer.Client + + cfg Config + bucket objstore.Bucket + + // Partition management + partitionMtx sync.RWMutex + partitionHandlers map[string]map[int32]*partitionProcessor +} + +func New(kafkaCfg kafka.Config, cfg Config, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service { + if cfg.StorageBucketPrefix != "" { + bucket = objstore.NewPrefixedBucket(bucket, cfg.StorageBucketPrefix) + } + s := &Service{ + logger: log.With(logger, "component", groupName), + cfg: cfg, + bucket: bucket, + partitionHandlers: make(map[string]map[int32]*partitionProcessor), + reg: reg, + } + + client, err := consumer.NewGroupClient( + kafkaCfg, + partitionRing, + groupName, + client.NewReaderClientMetrics(groupName, reg), + logger, + kgo.InstanceID(instanceID), + kgo.SessionTimeout(3*time.Minute), + kgo.RebalanceTimeout(5*time.Minute), + kgo.OnPartitionsAssigned(s.handlePartitionsAssigned), + kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) { + s.handlePartitionsRevoked(m) + }), + ) + if err != nil { + level.Error(logger).Log("msg", "failed to create consumer", "err", err) + return nil + } + s.client = client + s.Service = services.NewBasicService(nil, s.run, s.stopping) + return s +} + +func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) { + level.Info(s.logger).Log("msg", "partitions assigned", "partitions", formatPartitionsMap(partitions)) + s.partitionMtx.Lock() + defer s.partitionMtx.Unlock() + + for topic, parts := range partitions { + if _, ok := s.partitionHandlers[topic]; !ok { + s.partitionHandlers[topic] = make(map[int32]*partitionProcessor) + } + + for _, partition := range parts { + processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.bucket, s.cfg.TenantID, topic, partition, s.logger, s.reg) + s.partitionHandlers[topic][partition] = processor + processor.start() + } + } +} + +func (s *Service) handlePartitionsRevoked(partitions map[string][]int32) { + level.Info(s.logger).Log("msg", "partitions revoked", "partitions", formatPartitionsMap(partitions)) + s.partitionMtx.Lock() + defer s.partitionMtx.Unlock() + + var wg sync.WaitGroup + for topic, parts := range partitions { + if handlers, ok := s.partitionHandlers[topic]; ok { + for _, partition := range parts { + if processor, exists := handlers[partition]; exists { + wg.Add(1) + go func(p *partitionProcessor) { + defer wg.Done() + p.stop() + }(processor) + delete(handlers, partition) + } + } + if len(handlers) == 0 { + delete(s.partitionHandlers, topic) + } + } + } + wg.Wait() +} + +func (s *Service) run(ctx context.Context) error { + for { + fetches := s.client.PollRecords(ctx, -1) + if fetches.IsClientClosed() || ctx.Err() != nil { + return nil + } + if errs := fetches.Errors(); len(errs) > 0 { + var multiErr error + for _, err := range errs { + multiErr = errors.Join(multiErr, err.Err) + } + level.Error(s.logger).Log("msg", "error fetching records", "err", multiErr.Error()) + continue + } + if fetches.Empty() { + continue + } + + fetches.EachPartition(func(ftp kgo.FetchTopicPartition) { + s.partitionMtx.RLock() + handlers, ok := s.partitionHandlers[ftp.Topic] + if !ok { + s.partitionMtx.RUnlock() + return + } + processor, ok := handlers[ftp.Partition] + s.partitionMtx.RUnlock() + if !ok { + return + } + + // Collect all records for this partition + records := ftp.Records + if len(records) == 0 { + return + } + + for _, record := range records { + select { + case <-processor.ctx.Done(): + return + case processor.records <- record: + // Record sent successfully + } + } + }) + } +} + +func (s *Service) stopping(failureCase error) error { + s.partitionMtx.Lock() + defer s.partitionMtx.Unlock() + + var wg sync.WaitGroup + for _, handlers := range s.partitionHandlers { + for _, processor := range handlers { + wg.Add(1) + go func(p *partitionProcessor) { + defer wg.Done() + p.stop() + }(processor) + } + } + wg.Wait() + // Only close the client once all partitions have been stopped. + // This is to ensure that all records have been processed before closing and offsets committed. + s.client.Close() + level.Info(s.logger).Log("msg", "consumer stopped") + return failureCase +} + +// Helper function to format []int32 slice +func formatInt32Slice(slice []int32) string { + if len(slice) == 0 { + return "[]" + } + result := "[" + for i, v := range slice { + if i > 0 { + result += "," + } + result += strconv.Itoa(int(v)) + } + result += "]" + return result +} + +// Helper function to format map[string][]int32 into a readable string +func formatPartitionsMap(partitions map[string][]int32) string { + var result string + for topic, parts := range partitions { + if len(result) > 0 { + result += ", " + } + result += topic + "=" + formatInt32Slice(parts) + } + return result +} diff --git a/pkg/dataobj/internal/sections/logs/table.go b/pkg/dataobj/internal/sections/logs/table.go index aea082be06..82658e967f 100644 --- a/pkg/dataobj/internal/sections/logs/table.go +++ b/pkg/dataobj/internal/sections/logs/table.go @@ -80,7 +80,6 @@ func (t *table) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[ return nil }) - } // Size returns the total size of the table in bytes. diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index 518807e210..138de989cc 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sort" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -16,6 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" ) @@ -33,6 +35,20 @@ type Stream struct { Rows int // Number of rows in the stream. } +func (s *Stream) Reset() { + s.ID = 0 + s.Labels = nil + s.MinTimestamp = time.Time{} + s.MaxTimestamp = time.Time{} + s.Rows = 0 +} + +var streamPool = sync.Pool{ + New: func() interface{} { + return &Stream{} + }, +} + // Streams tracks information about streams in a data object. type Streams struct { metrics *Metrics @@ -61,10 +77,26 @@ func New(metrics *Metrics, pageSize int) *Streams { return &Streams{ metrics: metrics, pageSize: pageSize, - lookup: make(map[uint64][]*Stream), + lookup: make(map[uint64][]*Stream, 1024), + ordered: make([]*Stream, 0, 1024), } } +func (s *Streams) Iter() result.Seq[Stream] { + return result.Iter(func(yield func(Stream) bool) error { + for _, stream := range s.ordered { + if !yield(*stream) { + return nil + } + } + return nil + }) +} + +func (s *Streams) GetBounds() (time.Time, time.Time) { + return s.globalMinTimestamp, s.globalMaxTimestamp +} + // Record a stream record within the Streams section. The provided timestamp is // used to track the minimum and maximum timestamp of a stream. The number of // calls to Record is used to track the number of rows for a stream. @@ -153,7 +185,11 @@ func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream { s.currentLabelsSize += len(lbl.Value) } - newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels} + newStream := streamPool.Get().(*Stream) + newStream.Reset() + newStream.ID = s.lastID.Add(1) + newStream.Labels = streamLabels + s.lookup[hash] = append(s.lookup[hash], newStream) s.ordered = append(s.ordered, newStream) s.metrics.streamCount.Inc() @@ -187,7 +223,6 @@ func (s *Streams) StreamID(streamLabels labels.Labels) int64 { func (s *Streams) EncodeTo(enc *encoding.Encoder) error { timer := prometheus.NewTimer(s.metrics.encodeSeconds) defer timer.ObserveDuration() - defer s.Reset() // TODO(rfratto): handle one section becoming too large. This can happen when // the number of columns is very wide. There are two approaches to handle @@ -335,6 +370,9 @@ func encodeColumn(enc *encoding.StreamsEncoder, columnType streamsmd.ColumnType, // Reset resets all state, allowing Streams to be reused. func (s *Streams) Reset() { s.lastID.Store(0) + for _, stream := range s.ordered { + streamPool.Put(stream) + } clear(s.lookup) s.ordered = sliceclear.Clear(s.ordered) s.currentLabelsSize = 0 diff --git a/pkg/dataobj/metastore/metastore.go b/pkg/dataobj/metastore/metastore.go new file mode 100644 index 0000000000..08c0b00364 --- /dev/null +++ b/pkg/dataobj/metastore/metastore.go @@ -0,0 +1,175 @@ +package metastore + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/logproto" +) + +const ( + metastoreWindowSize = 12 * time.Hour +) + +var ( + // Define our own builder config because metastore objects are significantly smaller. + metastoreBuilderCfg = dataobj.BuilderConfig{ + SHAPrefixSize: 2, + TargetObjectSize: 32 * 1024 * 1024, + TargetPageSize: 4 * 1024 * 1024, + BufferSize: 32 * 1024 * 1024, // 8x page size + TargetSectionSize: 4 * 1024 * 1024, // object size / 8 + } +) + +type Manager struct { + metastoreBuilder *dataobj.Builder + tenantID string + metrics *metastoreMetrics + bucket objstore.Bucket + logger log.Logger + backoff *backoff.Backoff + + builderOnce sync.Once +} + +func NewMetastoreManager(bucket objstore.Bucket, tenantID string, logger log.Logger, reg prometheus.Registerer) (*Manager, error) { + metrics := newMetastoreMetrics() + if err := metrics.register(reg); err != nil { + return nil, err + } + + return &Manager{ + bucket: bucket, + metrics: metrics, + logger: logger, + tenantID: tenantID, + backoff: backoff.New(context.TODO(), backoff.Config{ + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 10 * time.Second, + }), + builderOnce: sync.Once{}, + }, nil +} + +func (m *Manager) initBuilder() error { + var initErr error + m.builderOnce.Do(func() { + metastoreBuilder, err := dataobj.NewBuilder(metastoreBuilderCfg, m.bucket, m.tenantID) + if err != nil { + initErr = err + return + } + m.metastoreBuilder = metastoreBuilder + }) + return initErr +} + +func (m *Manager) UpdateMetastore(ctx context.Context, flushResult dataobj.FlushResult) error { + var err error + start := time.Now() + defer m.metrics.observeMetastoreProcessing(start) + + // Initialize builder if this is the first call for this partition + if err := m.initBuilder(); err != nil { + return err + } + + minTimestamp, maxTimestamp := flushResult.MinTimestamp, flushResult.MaxTimestamp + + // Work our way through the metastore objects window by window, updating & creating them as needed. + // Each one handles its own retries in order to keep making progress in the event of a failure. + minMetastoreWindow := minTimestamp.Truncate(metastoreWindowSize) + maxMetastoreWindow := maxTimestamp.Truncate(metastoreWindowSize) + for metastoreWindow := minMetastoreWindow; metastoreWindow.Compare(maxMetastoreWindow) <= 0; metastoreWindow = metastoreWindow.Add(metastoreWindowSize) { + metastorePath := fmt.Sprintf("tenant-%s/metastore/%s.store", m.tenantID, metastoreWindow.Format(time.RFC3339)) + m.backoff.Reset() + for m.backoff.Ongoing() { + err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) { + buf, err := io.ReadAll(existing) + if err != nil { + return nil, err + } + + m.metastoreBuilder.Reset() + + if len(buf) > 0 { + replayStart := time.Now() + object := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf))) + if err := m.readFromExisting(ctx, object); err != nil { + return nil, err + } + m.metrics.observeMetastoreReplay(replayStart) + } + + encodingStart := time.Now() + + ls := fmt.Sprintf("{__start__=\"%d\", __end__=\"%d\", __path__=\"%s\"}", minTimestamp.UnixNano(), maxTimestamp.UnixNano(), flushResult.Path) + err = m.metastoreBuilder.Append(logproto.Stream{ + Labels: ls, + Entries: []logproto.Entry{{Line: ""}}, + }) + if err != nil { + return nil, err + } + + newMetastore, err := m.metastoreBuilder.FlushToBuffer() + if err != nil { + return nil, err + } + m.metrics.observeMetastoreEncoding(encodingStart) + return newMetastore, nil + }) + if err == nil { + level.Info(m.logger).Log("msg", "successfully merged & updated metastore", "metastore", metastorePath) + break + } + level.Error(m.logger).Log("msg", "failed to get and replace metastore object", "err", err, "metastore", metastorePath) + m.metrics.incMetastoreWriteFailures() + m.backoff.Wait() + } + // Reset at the end too so we don't leave our memory hanging around between calls. + m.metastoreBuilder.Reset() + } + return err +} + +func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object) error { + // Fetch sections + si, err := object.Metadata(ctx) + if err != nil { + return err + } + + // Read streams from existing metastore object and write them to the builder for the new object + streams := make([]dataobj.Stream, 100) + for i := 0; i < si.StreamsSections; i++ { + streamsReader := dataobj.NewStreamsReader(object, i) + for n, err := streamsReader.Read(ctx, streams); n > 0; n, err = streamsReader.Read(ctx, streams) { + if err != nil && err != io.EOF { + return err + } + for _, stream := range streams[:n] { + err = m.metastoreBuilder.Append(logproto.Stream{ + Labels: stream.Labels.String(), + Entries: []logproto.Entry{{Line: ""}}, + }) + if err != nil { + return err + } + } + } + } + return nil +} diff --git a/pkg/dataobj/metastore/metastore_test.go b/pkg/dataobj/metastore/metastore_test.go new file mode 100644 index 0000000000..582882917b --- /dev/null +++ b/pkg/dataobj/metastore/metastore_test.go @@ -0,0 +1,106 @@ +package metastore + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/backoff" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj" +) + +func BenchmarkWriteMetastores(t *testing.B) { + ctx := context.Background() + bucket := objstore.NewInMemBucket() + tenantID := "test-tenant" + + m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer) + require.NoError(t, err) + + // Set limits for the test + m.backoff = backoff.New(context.TODO(), backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + MaxRetries: 3, + }) + + // Add test data spanning multiple metastore windows + now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) + + flushResults := make([]dataobj.FlushResult, 1000) + for i := 0; i < 1000; i++ { + flushResults[i] = dataobj.FlushResult{ + Path: fmt.Sprintf("test-dataobj-path-%d", i), + MinTimestamp: now.Add(-1 * time.Hour).Add(time.Duration(i) * time.Millisecond), + MaxTimestamp: now, + } + } + + t.ResetTimer() + t.ReportAllocs() + for i := 0; i < t.N; i++ { + // Test writing metastores + err = m.UpdateMetastore(ctx, flushResults[i%len(flushResults)]) + require.NoError(t, err) + } + + require.Len(t, bucket.Objects(), 1) +} + +func TestWriteMetastores(t *testing.T) { + ctx := context.Background() + bucket := objstore.NewInMemBucket() + tenantID := "test-tenant" + + m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer) + require.NoError(t, err) + + // Set limits for the test + m.backoff = backoff.New(context.TODO(), backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + MaxRetries: 3, + }) + + // Add test data spanning multiple metastore windows + now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) + + flushResult := dataobj.FlushResult{ + Path: "test-dataobj-path", + MinTimestamp: now.Add(-1 * time.Hour), + MaxTimestamp: now, + } + + require.Len(t, bucket.Objects(), 0) + + // Test writing metastores + err = m.UpdateMetastore(ctx, flushResult) + require.NoError(t, err) + + require.Len(t, bucket.Objects(), 1) + var originalSize int + for _, obj := range bucket.Objects() { + originalSize = len(obj) + } + + flushResult2 := dataobj.FlushResult{ + Path: "different-test-dataobj-path", + MinTimestamp: now.Add(-15 * time.Minute), + MaxTimestamp: now, + } + + err = m.UpdateMetastore(ctx, flushResult2) + require.NoError(t, err) + + require.Len(t, bucket.Objects(), 1) + for _, obj := range bucket.Objects() { + require.Greater(t, len(obj), originalSize) + } +} diff --git a/pkg/dataobj/metastore/metrics.go b/pkg/dataobj/metastore/metrics.go new file mode 100644 index 0000000000..424f1e27cc --- /dev/null +++ b/pkg/dataobj/metastore/metrics.go @@ -0,0 +1,102 @@ +package metastore + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type metastoreMetrics struct { + metastoreProcessingTime prometheus.Histogram + metastoreReplayTime prometheus.Histogram + metastoreEncodingTime prometheus.Histogram + metastoreWriteFailures prometheus.Counter +} + +func newMetastoreMetrics() *metastoreMetrics { + metrics := &metastoreMetrics{ + metastoreReplayTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_dataobj_consumer_metastore_replay_seconds", + Help: "Time taken to replay existing metastore data into the in-memory builder in seconds", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + metastoreEncodingTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_dataobj_consumer_metastore_encoding_seconds", + Help: "Time taken to add the new metadata & encode the new metastore data object in seconds", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + metastoreProcessingTime: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_dataobj_consumer_metastore_processing_seconds", + Help: "Total time taken to update all metastores for a flushed dataobj in seconds", + Buckets: prometheus.DefBuckets, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 0, + }), + metastoreWriteFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_dataobj_consumer_metastore_write_failures_total", + Help: "Total number of metastore write failures", + }), + } + + return metrics +} + +func (p *metastoreMetrics) register(reg prometheus.Registerer) error { + collectors := []prometheus.Collector{ + p.metastoreReplayTime, + p.metastoreEncodingTime, + p.metastoreProcessingTime, + p.metastoreWriteFailures, + } + + for _, collector := range collectors { + if err := reg.Register(collector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + return err + } + } + } + return nil +} + +func (p *metastoreMetrics) unregister(reg prometheus.Registerer) { + collectors := []prometheus.Collector{ + p.metastoreReplayTime, + p.metastoreEncodingTime, + p.metastoreProcessingTime, + p.metastoreWriteFailures, + } + + for _, collector := range collectors { + reg.Unregister(collector) + } +} + +func (p *metastoreMetrics) incMetastoreWriteFailures() { + p.metastoreWriteFailures.Inc() +} + +func (p *metastoreMetrics) observeMetastoreReplay(recordTimestamp time.Time) { + if !recordTimestamp.IsZero() { // Only observe if timestamp is valid + p.metastoreReplayTime.Observe(time.Since(recordTimestamp).Seconds()) + } +} + +func (p *metastoreMetrics) observeMetastoreEncoding(recordTimestamp time.Time) { + if !recordTimestamp.IsZero() { // Only observe if timestamp is valid + p.metastoreEncodingTime.Observe(time.Since(recordTimestamp).Seconds()) + } +} + +func (p *metastoreMetrics) observeMetastoreProcessing(recordTimestamp time.Time) { + if !recordTimestamp.IsZero() { // Only observe if timestamp is valid + p.metastoreProcessingTime.Observe(time.Since(recordTimestamp).Seconds()) + } +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index d8c9be61f5..bba1dd7566 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/loki/v3/pkg/compactor" compactorclient "github.com/grafana/loki/v3/pkg/compactor/client" "github.com/grafana/loki/v3/pkg/compactor/deletion" + "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/explorer" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" @@ -109,6 +110,7 @@ type Config struct { TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"` + DataObjConsumer consumer.Config `yaml:"dataobj_consumer,omitempty" category:"experimental"` DataObjExplorer explorer.Config `yaml:"dataobj_explorer,omitempty" category:"experimental"` RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` @@ -192,6 +194,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.BlockBuilder.RegisterFlags(f) c.BlockScheduler.RegisterFlags(f) c.DataObjExplorer.RegisterFlags(f) + c.DataObjConsumer.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -307,6 +310,9 @@ func (c *Config) Validate() error { if err := c.KafkaConfig.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config")) } + if err := c.DataObjConsumer.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid dataobj_consumer config")) + } } if err := c.Distributor.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid distributor config")) @@ -390,6 +396,7 @@ type Loki struct { partitionRing *ring.PartitionInstanceRing blockBuilder *blockbuilder.BlockBuilder blockScheduler *blockscheduler.BlockScheduler + dataObjConsumer *consumer.Service ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -707,6 +714,8 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(BlockBuilder, t.initBlockBuilder) mm.RegisterModule(BlockScheduler, t.initBlockScheduler) mm.RegisterModule(DataObjExplorer, t.initDataObjExplorer) + mm.RegisterModule(DataObjConsumer, t.initDataObjConsumer) + mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) mm.RegisterModule(Write, nil) @@ -746,6 +755,7 @@ func (t *Loki) setupModuleManager() error { BlockBuilder: {PartitionRing, Store, Server}, BlockScheduler: {Server}, DataObjExplorer: {Server}, + DataObjConsumer: {PartitionRing, Server}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor, PatternIngester}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2bdddb33cc..0c79a98b2d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -51,6 +51,7 @@ import ( "github.com/grafana/loki/v3/pkg/compactor/client/grpc" "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/compactor/generationnumber" + "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/explorer" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" @@ -144,11 +145,11 @@ const ( BlockBuilder = "block-builder" BlockScheduler = "block-scheduler" DataObjExplorer = "dataobj-explorer" - - All = "all" - Read = "read" - Write = "write" - Backend = "backend" + DataObjConsumer = "dataobj-consumer" + All = "all" + Read = "read" + Write = "write" + Backend = "backend" ) const ( @@ -1905,6 +1906,33 @@ func (t *Loki) initDataObjExplorer() (services.Service, error) { return explorer, nil } +func (t *Loki) initDataObjConsumer() (services.Service, error) { + if !t.Cfg.Ingester.KafkaIngestion.Enabled { + return nil, nil + } + schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) + if err != nil { + return nil, fmt.Errorf("failed to get schema for now: %w", err) + } + + store, err := bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, "dataobj", util_log.Logger) + if err != nil { + return nil, err + } + level.Info(util_log.Logger).Log("msg", "initializing dataobj consumer", "instance", t.Cfg.Ingester.LifecyclerConfig.ID) + t.dataObjConsumer = consumer.New( + t.Cfg.KafkaConfig, + t.Cfg.DataObjConsumer, + store, + t.Cfg.Ingester.LifecyclerConfig.ID, + t.partitionRing, + prometheus.DefaultRegisterer, + util_log.Logger, + ) + + return t.dataObjConsumer, nil +} + func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil diff --git a/pkg/storage/bucket/prefixed_bucket_client.go b/pkg/storage/bucket/prefixed_bucket_client.go index c2d6cd0f42..e26edf8012 100644 --- a/pkg/storage/bucket/prefixed_bucket_client.go +++ b/pkg/storage/bucket/prefixed_bucket_client.go @@ -36,6 +36,11 @@ func (b *PrefixedBucketClient) Upload(ctx context.Context, name string, r io.Rea return } +// GetAndReplace is a helper function that gets an object from the bucket and replaces it with a new reader. +func (b *PrefixedBucketClient) GetAndReplace(ctx context.Context, name string, fn func(existing io.Reader) (io.Reader, error)) error { + return b.bucket.GetAndReplace(ctx, b.fullName(name), fn) +} + // Delete removes the object with the given name. func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error { return b.bucket.Delete(ctx, b.fullName(name)) diff --git a/pkg/storage/bucket/sse_bucket_client.go b/pkg/storage/bucket/sse_bucket_client.go index a613561c07..b6dd19fdbe 100644 --- a/pkg/storage/bucket/sse_bucket_client.go +++ b/pkg/storage/bucket/sse_bucket_client.go @@ -59,6 +59,10 @@ func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) return b.bucket.Upload(ctx, name, r) } +func (b *SSEBucketClient) GetAndReplace(ctx context.Context, name string, fn func(existing io.Reader) (io.Reader, error)) error { + return b.bucket.GetAndReplace(ctx, name, fn) +} + // Delete implements objstore.Bucket. func (b *SSEBucketClient) Delete(ctx context.Context, name string) error { return b.bucket.Delete(ctx, name) diff --git a/vendor/github.com/thanos-io/objstore/inmem.go b/vendor/github.com/thanos-io/objstore/inmem.go index 6a34406661..50e0441ca5 100644 --- a/vendor/github.com/thanos-io/objstore/inmem.go +++ b/vendor/github.com/thanos-io/objstore/inmem.go @@ -193,6 +193,32 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64 }, nil } +func (b *InMemBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + reader, err := b.Get(ctx, name) + if err != nil && !errors.Is(err, errNotFound) { + return err + } + b.mtx.Lock() + defer b.mtx.Unlock() + + if reader == nil { + reader = io.NopCloser(bytes.NewReader(nil)) + } + + new, err := f(reader) + if err != nil { + return err + } + + newObj, err := io.ReadAll(new) + if err != nil { + return err + } + + b.objects[name] = newObj + return nil +} + // Exists checks if the given directory exists in memory. func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error) { b.mtx.RLock() diff --git a/vendor/github.com/thanos-io/objstore/objstore.go b/vendor/github.com/thanos-io/objstore/objstore.go index 86ecfa2681..77540d2817 100644 --- a/vendor/github.com/thanos-io/objstore/objstore.go +++ b/vendor/github.com/thanos-io/objstore/objstore.go @@ -64,6 +64,10 @@ type Bucket interface { // Upload should be idempotent. Upload(ctx context.Context, name string, r io.Reader) error + // GetAndReplace an existing object with a new object + // If the previous object is created or updated before the new object is uploaded, then the call will fail with an error. + GetAndReplace(ctx context.Context, name string, f func(existing io.Reader) (io.Reader, error)) error + // Delete removes the object with the given name. // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error @@ -731,6 +735,10 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in ), nil } +func (b *metricBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + return b.bkt.GetAndReplace(ctx, name, f) +} + func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { const op = OpExists b.metrics.ops.WithLabelValues(op).Inc() diff --git a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go index a37450ca87..34830bf376 100644 --- a/vendor/github.com/thanos-io/objstore/prefixed_bucket.go +++ b/vendor/github.com/thanos-io/objstore/prefixed_bucket.go @@ -79,6 +79,10 @@ func (p *PrefixedBucket) GetRange(ctx context.Context, name string, off int64, l return p.bkt.GetRange(ctx, conditionalPrefix(p.prefix, name), off, length) } +func (b *PrefixedBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + return b.bkt.GetAndReplace(ctx, conditionalPrefix(b.prefix, name), f) +} + // Exists checks if the given object exists in the bucket. func (p *PrefixedBucket) Exists(ctx context.Context, name string) (bool, error) { return p.bkt.Exists(ctx, conditionalPrefix(p.prefix, name)) diff --git a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go index 05fbdb55c0..4553729092 100644 --- a/vendor/github.com/thanos-io/objstore/providers/azure/azure.go +++ b/vendor/github.com/thanos-io/objstore/providers/azure/azure.go @@ -429,3 +429,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err func (b *Bucket) Close() error { return nil } + +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + panic("unimplemented: Azure.GetAndReplace") +} diff --git a/vendor/github.com/thanos-io/objstore/providers/bos/bos.go b/vendor/github.com/thanos-io/objstore/providers/bos/bos.go index 0cc4352ccc..16b01ee10d 100644 --- a/vendor/github.com/thanos-io/objstore/providers/bos/bos.go +++ b/vendor/github.com/thanos-io/objstore/providers/bos/bos.go @@ -440,3 +440,7 @@ func validateForTest(conf Config) error { } return nil } + +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + panic("unimplemented: BOS.GetAndReplace") +} diff --git a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go index df602877be..920f3ace74 100644 --- a/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go +++ b/vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go @@ -11,6 +11,7 @@ import ( "path/filepath" "github.com/efficientgo/core/errcapture" + "github.com/gofrs/flock" "github.com/pkg/errors" "gopkg.in/yaml.v2" @@ -269,6 +270,41 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err erro return nil } +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + file := filepath.Join(b.rootDir, name) + + // Acquire a file lock before modifiying as file-systems don't support conditional writes like cloud providers. + fileLock := flock.New(file + ".lock") + locked, err := fileLock.TryLock() + if err != nil { + return err + } + if !locked { + return errors.New("file is locked by another process") + } + defer fileLock.Unlock() + + var r io.ReadCloser + r, err = os.Open(file) + if err != nil && !os.IsNotExist(err) { + return err + } else if err == nil { + defer r.Close() + } + + newContent, err := f(r) + if err != nil { + return err + } + + content, err := io.ReadAll(newContent) + if err != nil { + return err + } + + return os.WriteFile(file, content, 0600) +} + func isDirEmpty(name string) (ok bool, err error) { f, err := os.Open(filepath.Clean(name)) if os.IsNotExist(err) { diff --git a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go index b89f8735bc..cd9105ca31 100644 --- a/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go +++ b/vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go @@ -5,6 +5,7 @@ package gcs import ( + "bytes" "context" "fmt" "io" @@ -37,6 +38,8 @@ var DefaultConfig = Config{ HTTPConfig: exthttp.DefaultHTTPConfig, } +var _ objstore.Bucket = &Bucket{} + // Config stores the configuration for gcs bucket. type Config struct { Bucket string `yaml:"bucket"` @@ -273,7 +276,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - r, err := b.bkt.Object(name).NewReader(ctx) + r, err := b.get(ctx, name) if err != nil { return r, err } @@ -286,6 +289,10 @@ func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { }, nil } +func (b *Bucket) get(ctx context.Context, name string) (*storage.Reader, error) { + return b.bkt.Object(name).NewReader(ctx) +} + // GetRange returns a new range reader for the given object name and range. func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { r, err := b.bkt.Object(name).NewRangeReader(ctx, off, length) @@ -333,7 +340,21 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { // Upload writes the file specified in src to remote GCS location specified as target. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { - w := b.bkt.Object(name).NewWriter(ctx) + return b.upload(ctx, name, r, 0, false) +} + +// Upload writes the file specified in src to remote GCS location specified as target. +func (b *Bucket) upload(ctx context.Context, name string, r io.Reader, generation int64, requireNewObject bool) error { + o := b.bkt.Object(name) + + var w *storage.Writer + if generation != 0 { + o = o.If(storage.Conditions{GenerationMatch: generation}) + } + if requireNewObject { + o = o.If(storage.Conditions{DoesNotExist: true}) + } + w = o.NewWriter(ctx) // if `chunkSize` is 0, we don't set any custom value for writer's ChunkSize. // It uses whatever the default value https://pkg.go.dev/google.golang.org/cloud/storage#Writer @@ -347,6 +368,41 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { return w.Close() } +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + var mustNotExist bool + var generation int64 + + // Get the current object + storageReader, err := b.get(ctx, name) + if err != nil && !errors.Is(err, storage.ErrObjectNotExist) { + return err + } else if errors.Is(err, storage.ErrObjectNotExist) { + mustNotExist = true + } + + // If object exists, ensure we close the reader when done + if storageReader != nil { + generation = storageReader.Attrs.Generation + defer storageReader.Close() + } + + newContent, err := f(wrapReader(storageReader)) + if err != nil { + return err + } + + // Upload with the previous generation, or mustNotExist for new objects + return b.upload(ctx, name, newContent, generation, mustNotExist) +} + +func wrapReader(r *storage.Reader) io.ReadCloser { + if r == nil { + return io.NopCloser(bytes.NewReader(nil)) + } + + return r +} + // Delete removes the object with the given name. func (b *Bucket) Delete(ctx context.Context, name string) error { return b.bkt.Object(name).Delete(ctx) diff --git a/vendor/github.com/thanos-io/objstore/providers/oss/oss.go b/vendor/github.com/thanos-io/objstore/providers/oss/oss.go index 2a6cb219ad..761ed174db 100644 --- a/vendor/github.com/thanos-io/objstore/providers/oss/oss.go +++ b/vendor/github.com/thanos-io/objstore/providers/oss/oss.go @@ -426,3 +426,7 @@ func (b *Bucket) IsAccessDeniedErr(err error) bool { } return false } + +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + panic("unimplemented: OSS.GetAndReplace") +} diff --git a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go index cda78838ec..5fbed6464c 100644 --- a/vendor/github.com/thanos-io/objstore/providers/s3/s3.go +++ b/vendor/github.com/thanos-io/objstore/providers/s3/s3.go @@ -458,7 +458,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt }, filteredOpts...) } -func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { +func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (*minio.Object, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { return nil, err @@ -488,6 +488,16 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } + return r, nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + r, err := b.getRange(ctx, name, 0, -1) + if err != nil { + return r, err + } + return objstore.ObjectSizerReadCloser{ ReadCloser: r, Size: func() (int64, error) { @@ -501,14 +511,24 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( }, nil } -// Get returns a reader for the given object name. -func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return b.getRange(ctx, name, 0, -1) -} - // GetRange returns a new range reader for the given object name and range. func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - return b.getRange(ctx, name, off, length) + r, err := b.getRange(ctx, name, off, length) + if err != nil { + return r, err + } + + return objstore.ObjectSizerReadCloser{ + ReadCloser: r, + Size: func() (int64, error) { + stat, err := r.Stat() + if err != nil { + return 0, err + } + + return stat.Size, nil + }, + }, nil } // Exists checks if the given object exists. @@ -526,6 +546,10 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { + return b.upload(ctx, name, r, "", false) +} + +func (b *Bucket) upload(ctx context.Context, name string, r io.Reader, etag string, requireNewObject bool) error { sse, err := b.getServerSideEncryption(ctx) if err != nil { return err @@ -549,24 +573,33 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { userMetadata[k] = v } + putOpts := minio.PutObjectOptions{ + DisableMultipart: b.disableMultipart, + PartSize: partSize, + ServerSideEncryption: sse, + UserMetadata: userMetadata, + StorageClass: b.storageClass, + SendContentMd5: b.sendContentMd5, + // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we + // ensure we pin this number to four. + // TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck. + NumThreads: 4, + } + if etag != "" { + if requireNewObject { + putOpts.SetMatchETagExcept(etag) + } else { + putOpts.SetMatchETag(etag) + } + } + if _, err := b.client.PutObject( ctx, b.name, name, r, size, - minio.PutObjectOptions{ - DisableMultipart: b.disableMultipart, - PartSize: partSize, - ServerSideEncryption: sse, - UserMetadata: userMetadata, - StorageClass: b.storageClass, - SendContentMd5: b.sendContentMd5, - // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we - // ensure we pin this number to four. - // TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck. - NumThreads: 4, - }, + putOpts, ); err != nil { return errors.Wrap(err, "upload s3 object") } @@ -574,6 +607,30 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { return nil } +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + var requireNewObject bool + originalContent, err := b.getRange(ctx, name, 0, -1) + if err != nil && !b.IsObjNotFoundErr(err) { + return err + } else if b.IsObjNotFoundErr(err) { + requireNewObject = true + } + + // Call work function to get a new version of the file + newContent, err := f(originalContent) + if err != nil { + return err + } + + stats, err := originalContent.Stat() + if err != nil { + return err + } + + return b.upload(ctx, name, newContent, stats.ETag, requireNewObject) +} + // Attributes returns information about the specified object. func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { objInfo, err := b.client.StatObject(ctx, b.name, name, minio.StatObjectOptions{}) diff --git a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go index 19eb0d4545..aedb7b7923 100644 --- a/vendor/github.com/thanos-io/objstore/providers/swift/swift.go +++ b/vendor/github.com/thanos-io/objstore/providers/swift/swift.go @@ -375,6 +375,10 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err return nil } +func (b *Container) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + panic("unimplemented: Swift.GetAndReplace") +} + // Delete removes the object with the given name. func (c *Container) Delete(_ context.Context, name string) error { return errors.Wrap(c.connection.LargeObjectDelete(c.name, name), "delete object") diff --git a/vendor/github.com/thanos-io/objstore/testing.go b/vendor/github.com/thanos-io/objstore/testing.go index 80f1e198e0..6f049c7f54 100644 --- a/vendor/github.com/thanos-io/objstore/testing.go +++ b/vendor/github.com/thanos-io/objstore/testing.go @@ -287,6 +287,10 @@ func (d *delayingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e return d.bkt.Get(ctx, name) } +func (b *delayingBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error { + panic("unimplemented: delayingBucket.GetAndReplace") +} + func (d *delayingBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { time.Sleep(d.delay) return d.bkt.Attributes(ctx, name) diff --git a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go index 58bdea0776..ca3d5a7a55 100644 --- a/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go +++ b/vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go @@ -118,6 +118,14 @@ func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (er return } +func (t TracingBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) (err error) { + doWithSpan(ctx, "bucket_get_and_replace", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("name", name) + err = t.bkt.GetAndReplace(spanCtx, name, f) + }) + return +} + func (t TracingBucket) Delete(ctx context.Context, name string) (err error) { doWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) diff --git a/vendor/modules.txt b/vendor/modules.txt index f69782e5ee..5df67307d1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1633,7 +1633,7 @@ github.com/stretchr/testify/assert/yaml github.com/stretchr/testify/mock github.com/stretchr/testify/require github.com/stretchr/testify/suite -# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a +# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 ## explicit; go 1.22 github.com/thanos-io/objstore github.com/thanos-io/objstore/clientutil @@ -2562,3 +2562,4 @@ sigs.k8s.io/yaml/goyaml.v2 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc # github.com/grafana/loki/pkg/push => ./pkg/push # github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0 +# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866