chore(dataobj): Create initial dataobj builder (#16011)

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
Co-authored-by: Robert Fratto <robertfratto@gmail.com>
pull/16053/head
benclive 11 months ago committed by GitHub
parent 4b44b59ee7
commit ca4c025ad0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 30
      docs/sources/shared/configuration.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 46
      pkg/dataobj/builder.go
  5. 3
      pkg/dataobj/builder_test.go
  6. 32
      pkg/dataobj/consumer/config.go
  7. 117
      pkg/dataobj/consumer/metrics.go
  8. 201
      pkg/dataobj/consumer/partition_processor.go
  9. 218
      pkg/dataobj/consumer/service.go
  10. 1
      pkg/dataobj/internal/sections/logs/table.go
  11. 44
      pkg/dataobj/internal/sections/streams/streams.go
  12. 175
      pkg/dataobj/metastore/metastore.go
  13. 106
      pkg/dataobj/metastore/metastore_test.go
  14. 102
      pkg/dataobj/metastore/metrics.go
  15. 10
      pkg/loki/loki.go
  16. 38
      pkg/loki/modules.go
  17. 5
      pkg/storage/bucket/prefixed_bucket_client.go
  18. 4
      pkg/storage/bucket/sse_bucket_client.go
  19. 26
      vendor/github.com/thanos-io/objstore/inmem.go
  20. 8
      vendor/github.com/thanos-io/objstore/objstore.go
  21. 4
      vendor/github.com/thanos-io/objstore/prefixed_bucket.go
  22. 4
      vendor/github.com/thanos-io/objstore/providers/azure/azure.go
  23. 4
      vendor/github.com/thanos-io/objstore/providers/bos/bos.go
  24. 36
      vendor/github.com/thanos-io/objstore/providers/filesystem/filesystem.go
  25. 60
      vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go
  26. 4
      vendor/github.com/thanos-io/objstore/providers/oss/oss.go
  27. 95
      vendor/github.com/thanos-io/objstore/providers/s3/s3.go
  28. 4
      vendor/github.com/thanos-io/objstore/providers/swift/swift.go
  29. 4
      vendor/github.com/thanos-io/objstore/testing.go
  30. 8
      vendor/github.com/thanos-io/objstore/tracing/opentracing/opentracing.go
  31. 3
      vendor/modules.txt

@ -895,6 +895,36 @@ kafka_config:
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]
dataobj_consumer:
builderconfig:
# The size of the SHA prefix to use for the data object builder.
# CLI flag: -dataobj-consumer.sha-prefix-size
[sha_prefix_size: <int> | default = 2]
# The size of the target page to use for the data object builder.
# CLI flag: -dataobj-consumer.target-page-size
[target_page_size: <int> | default = 2MiB]
# The size of the target object to use for the data object builder.
# CLI flag: -dataobj-consumer.target-object-size
[target_object_size: <int> | default = 1GiB]
# Configures a maximum size for sections, for sections that support it.
# CLI flag: -dataobj-consumer.target-section-size
[target_section_size: <int> | default = 128MiB]
# The size of the buffer to use for sorting logs.
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]
# The tenant ID to use for the data object builder.
# CLI flag: -dataobj-consumer.tenant-id
[tenant_id: <string> | default = "fake"]
# The prefix to use for the storage bucket.
# CLI flag: -dataobj-consumer.storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
dataobj_explorer:
# Prefix to use when exploring the bucket. If set, only objects under this
# prefix will be visible.

@ -408,3 +408,5 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push
// leodido fork his project to continue support
replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0
replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866

@ -628,6 +628,8 @@ github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675 h1:U94jQ2TQr1m3
github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866 h1:/y3qC0I9kttHjLPxp4bGf+4jcJw60C6hrokTPckHYT8=
github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
@ -1124,8 +1126,6 @@ github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08Yu
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/tencentyun/cos-go-sdk-v5 v0.7.40/go.mod h1:4dCEtLHGh8QPxHEkgq+nFaky7yZxQuYwgSJM87icDaw=
github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a h1:wFBHAmtq1tOLPFaiC4LozyG/BzkRa3ZTmVv1KujUNqk=
github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=

@ -8,6 +8,7 @@ import (
"errors"
"flag"
"fmt"
"time"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
@ -126,6 +127,12 @@ type Builder struct {
type builderState int
type FlushResult struct {
Path string
MinTimestamp time.Time
MaxTimestamp time.Time
}
const (
// builderStateReady indicates the builder is empty and ready to accept new data.
builderStateEmpty builderState = iota
@ -285,15 +292,10 @@ func streamSizeEstimate(stream logproto.Stream) int {
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
func (b *Builder) Flush(ctx context.Context) error {
switch b.state {
case builderStateEmpty:
return nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
func (b *Builder) Flush(ctx context.Context) (FlushResult, error) {
buf, err := b.FlushToBuffer()
if err != nil {
return FlushResult{}, fmt.Errorf("flushing buffer: %w", err)
}
timer := prometheus.NewTimer(b.metrics.flushTime)
@ -303,12 +305,32 @@ func (b *Builder) Flush(ctx context.Context) error {
sumStr := hex.EncodeToString(sum[:])
objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:])
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil {
return err
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(buf.Bytes())); err != nil {
return FlushResult{}, fmt.Errorf("uploading object: %w", err)
}
minTime, maxTime := b.streams.GetBounds()
b.Reset()
return nil
return FlushResult{
Path: objectPath,
MinTimestamp: minTime,
MaxTimestamp: maxTime,
}, nil
}
func (b *Builder) FlushToBuffer() (*bytes.Buffer, error) {
switch b.state {
case builderStateEmpty:
return nil, nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return nil, fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
}
return b.flushBuffer, nil
}
func (b *Builder) buildObject() error {

@ -81,7 +81,8 @@ func TestBuilder(t *testing.T) {
for _, entry := range streams {
require.NoError(t, builder.Append(entry))
}
require.NoError(t, builder.Flush(context.Background()))
_, err = builder.Flush(context.Background())
require.NoError(t, err)
})
t.Run("Read", func(t *testing.T) {

@ -0,0 +1,32 @@
package consumer
import (
"errors"
"flag"
"github.com/grafana/loki/v3/pkg/dataobj"
)
type Config struct {
dataobj.BuilderConfig
TenantID string `yaml:"tenant_id"`
// StorageBucketPrefix is the prefix to use for the storage bucket.
StorageBucketPrefix string `yaml:"storage_bucket_prefix"`
}
func (cfg *Config) Validate() error {
if cfg.TenantID == "" {
return errors.New("tenantID is required")
}
return cfg.BuilderConfig.Validate()
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("dataobj-consumer.", f)
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(&cfg.TenantID, prefix+"tenant-id", "fake", "The tenant ID to use for the data object builder.")
f.StringVar(&cfg.StorageBucketPrefix, prefix+"storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.")
}

@ -0,0 +1,117 @@
package consumer
import (
"time"
"go.uber.org/atomic"
"github.com/prometheus/client_golang/prometheus"
)
type partitionOffsetMetrics struct {
currentOffset prometheus.GaugeFunc
lastOffset atomic.Int64
// Error counters
flushFailures prometheus.Counter
commitFailures prometheus.Counter
appendFailures prometheus.Counter
// Processing delay histogram
processingDelay prometheus.Histogram
}
func newPartitionOffsetMetrics() *partitionOffsetMetrics {
p := &partitionOffsetMetrics{
flushFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_flush_failures_total",
Help: "Total number of flush failures",
}),
commitFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_commit_failures_total",
Help: "Total number of commit failures",
}),
appendFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_append_failures_total",
Help: "Total number of append failures",
}),
processingDelay: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_dataobj_consumer_processing_delay_seconds",
Help: "Time difference between record timestamp and processing time in seconds",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
}
p.currentOffset = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "loki_dataobj_consumer_current_offset",
Help: "The last consumed offset for this partition",
},
p.getCurrentOffset,
)
return p
}
func (p *partitionOffsetMetrics) getCurrentOffset() float64 {
return float64(p.lastOffset.Load())
}
func (p *partitionOffsetMetrics) register(reg prometheus.Registerer) error {
collectors := []prometheus.Collector{
p.currentOffset,
p.flushFailures,
p.commitFailures,
p.appendFailures,
p.processingDelay,
}
for _, collector := range collectors {
if err := reg.Register(collector); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
return err
}
}
}
return nil
}
func (p *partitionOffsetMetrics) unregister(reg prometheus.Registerer) {
collectors := []prometheus.Collector{
p.currentOffset,
p.flushFailures,
p.commitFailures,
p.appendFailures,
p.processingDelay,
}
for _, collector := range collectors {
reg.Unregister(collector)
}
}
func (p *partitionOffsetMetrics) updateOffset(offset int64) {
p.lastOffset.Store(offset)
}
func (p *partitionOffsetMetrics) incFlushFailures() {
p.flushFailures.Inc()
}
func (p *partitionOffsetMetrics) incCommitFailures() {
p.commitFailures.Inc()
}
func (p *partitionOffsetMetrics) incAppendFailures() {
p.appendFailures.Inc()
}
func (p *partitionOffsetMetrics) observeProcessingDelay(recordTimestamp time.Time) {
// Convert milliseconds to seconds and calculate delay
if !recordTimestamp.IsZero() { // Only observe if timestamp is valid
p.processingDelay.Observe(time.Since(recordTimestamp).Seconds())
}
}

@ -0,0 +1,201 @@
package consumer
import (
"bytes"
"context"
"strconv"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/kafka"
)
type partitionProcessor struct {
// Kafka client and topic/partition info
client *kgo.Client
topic string
partition int32
tenantID []byte
// Processing pipeline
records chan *kgo.Record
builder *dataobj.Builder
decoder *kafka.Decoder
// Builder initialization
builderOnce sync.Once
builderCfg dataobj.BuilderConfig
bucket objstore.Bucket
metastoreManager *metastore.Manager
// Metrics
metrics *partitionOffsetMetrics
// Control and coordination
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
reg prometheus.Registerer
logger log.Logger
}
func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, bucket objstore.Bucket, tenantID string, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor {
ctx, cancel := context.WithCancel(ctx)
decoder, err := kafka.NewDecoder()
if err != nil {
panic(err)
}
reg = prometheus.WrapRegistererWith(prometheus.Labels{
"partition": strconv.Itoa(int(partition)),
}, reg)
metrics := newPartitionOffsetMetrics()
if err := metrics.register(reg); err != nil {
level.Error(logger).Log("msg", "failed to register partition metrics", "err", err)
}
metastoreManager, err := metastore.NewMetastoreManager(bucket, tenantID, logger, reg)
if err != nil {
level.Error(logger).Log("msg", "failed to create metastore manager", "err", err)
cancel()
return nil
}
return &partitionProcessor{
client: client,
logger: log.With(logger, "topic", topic, "partition", partition),
topic: topic,
partition: partition,
records: make(chan *kgo.Record, 1000),
ctx: ctx,
cancel: cancel,
decoder: decoder,
reg: reg,
builderCfg: builderCfg,
bucket: bucket,
tenantID: []byte(tenantID),
metrics: metrics,
metastoreManager: metastoreManager,
}
}
func (p *partitionProcessor) start() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(p.records)
level.Info(p.logger).Log("msg", "started partition processor")
for {
select {
case <-p.ctx.Done():
level.Info(p.logger).Log("msg", "stopping partition processor")
return
case record := <-p.records:
p.processRecord(record)
}
}
}()
}
func (p *partitionProcessor) stop() {
p.cancel()
p.wg.Wait()
if p.builder != nil {
p.builder.UnregisterMetrics(p.reg)
}
p.metrics.unregister(p.reg)
}
func (p *partitionProcessor) initBuilder() error {
var initErr error
p.builderOnce.Do(func() {
builder, err := dataobj.NewBuilder(p.builderCfg, p.bucket, string(p.tenantID))
if err != nil {
initErr = err
return
}
if err := builder.RegisterMetrics(p.reg); err != nil {
initErr = err
return
}
p.builder = builder
})
return initErr
}
func (p *partitionProcessor) processRecord(record *kgo.Record) {
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)
// Observe processing delay
p.metrics.observeProcessingDelay(record.Timestamp)
// Initialize builder if this is the first record
if err := p.initBuilder(); err != nil {
level.Error(p.logger).Log("msg", "failed to initialize builder", "err", err)
return
}
// todo: handle multi-tenant
if !bytes.Equal(record.Key, p.tenantID) {
return
}
stream, err := p.decoder.DecodeWithoutLabels(record.Value)
if err != nil {
level.Error(p.logger).Log("msg", "failed to decode record", "err", err)
return
}
if err := p.builder.Append(stream); err != nil {
if err != dataobj.ErrBufferFull {
level.Error(p.logger).Log("msg", "failed to append stream", "err", err)
p.metrics.incAppendFailures()
return
}
backoff := backoff.New(p.ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
})
var flushResult dataobj.FlushResult
for backoff.Ongoing() {
flushResult, err = p.builder.Flush(p.ctx)
if err == nil {
break
}
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
p.metrics.incFlushFailures()
backoff.Wait()
}
if err := p.metastoreManager.UpdateMetastore(p.ctx, flushResult); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return
}
backoff.Reset()
for backoff.Ongoing() {
err = p.client.CommitRecords(p.ctx, record)
if err == nil {
break
}
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
p.metrics.incCommitFailures()
backoff.Wait()
}
if err := p.builder.Append(stream); err != nil {
level.Error(p.logger).Log("msg", "failed to append stream after flushing", "err", err)
p.metrics.incAppendFailures()
}
}
}

@ -0,0 +1,218 @@
package consumer
import (
"context"
"errors"
"strconv"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/kafka/partitionring/consumer"
)
const (
groupName = "dataobj-consumer"
)
type Service struct {
services.Service
logger log.Logger
reg prometheus.Registerer
client *consumer.Client
cfg Config
bucket objstore.Bucket
// Partition management
partitionMtx sync.RWMutex
partitionHandlers map[string]map[int32]*partitionProcessor
}
func New(kafkaCfg kafka.Config, cfg Config, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service {
if cfg.StorageBucketPrefix != "" {
bucket = objstore.NewPrefixedBucket(bucket, cfg.StorageBucketPrefix)
}
s := &Service{
logger: log.With(logger, "component", groupName),
cfg: cfg,
bucket: bucket,
partitionHandlers: make(map[string]map[int32]*partitionProcessor),
reg: reg,
}
client, err := consumer.NewGroupClient(
kafkaCfg,
partitionRing,
groupName,
client.NewReaderClientMetrics(groupName, reg),
logger,
kgo.InstanceID(instanceID),
kgo.SessionTimeout(3*time.Minute),
kgo.RebalanceTimeout(5*time.Minute),
kgo.OnPartitionsAssigned(s.handlePartitionsAssigned),
kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
s.handlePartitionsRevoked(m)
}),
)
if err != nil {
level.Error(logger).Log("msg", "failed to create consumer", "err", err)
return nil
}
s.client = client
s.Service = services.NewBasicService(nil, s.run, s.stopping)
return s
}
func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
level.Info(s.logger).Log("msg", "partitions assigned", "partitions", formatPartitionsMap(partitions))
s.partitionMtx.Lock()
defer s.partitionMtx.Unlock()
for topic, parts := range partitions {
if _, ok := s.partitionHandlers[topic]; !ok {
s.partitionHandlers[topic] = make(map[int32]*partitionProcessor)
}
for _, partition := range parts {
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.bucket, s.cfg.TenantID, topic, partition, s.logger, s.reg)
s.partitionHandlers[topic][partition] = processor
processor.start()
}
}
}
func (s *Service) handlePartitionsRevoked(partitions map[string][]int32) {
level.Info(s.logger).Log("msg", "partitions revoked", "partitions", formatPartitionsMap(partitions))
s.partitionMtx.Lock()
defer s.partitionMtx.Unlock()
var wg sync.WaitGroup
for topic, parts := range partitions {
if handlers, ok := s.partitionHandlers[topic]; ok {
for _, partition := range parts {
if processor, exists := handlers[partition]; exists {
wg.Add(1)
go func(p *partitionProcessor) {
defer wg.Done()
p.stop()
}(processor)
delete(handlers, partition)
}
}
if len(handlers) == 0 {
delete(s.partitionHandlers, topic)
}
}
}
wg.Wait()
}
func (s *Service) run(ctx context.Context) error {
for {
fetches := s.client.PollRecords(ctx, -1)
if fetches.IsClientClosed() || ctx.Err() != nil {
return nil
}
if errs := fetches.Errors(); len(errs) > 0 {
var multiErr error
for _, err := range errs {
multiErr = errors.Join(multiErr, err.Err)
}
level.Error(s.logger).Log("msg", "error fetching records", "err", multiErr.Error())
continue
}
if fetches.Empty() {
continue
}
fetches.EachPartition(func(ftp kgo.FetchTopicPartition) {
s.partitionMtx.RLock()
handlers, ok := s.partitionHandlers[ftp.Topic]
if !ok {
s.partitionMtx.RUnlock()
return
}
processor, ok := handlers[ftp.Partition]
s.partitionMtx.RUnlock()
if !ok {
return
}
// Collect all records for this partition
records := ftp.Records
if len(records) == 0 {
return
}
for _, record := range records {
select {
case <-processor.ctx.Done():
return
case processor.records <- record:
// Record sent successfully
}
}
})
}
}
func (s *Service) stopping(failureCase error) error {
s.partitionMtx.Lock()
defer s.partitionMtx.Unlock()
var wg sync.WaitGroup
for _, handlers := range s.partitionHandlers {
for _, processor := range handlers {
wg.Add(1)
go func(p *partitionProcessor) {
defer wg.Done()
p.stop()
}(processor)
}
}
wg.Wait()
// Only close the client once all partitions have been stopped.
// This is to ensure that all records have been processed before closing and offsets committed.
s.client.Close()
level.Info(s.logger).Log("msg", "consumer stopped")
return failureCase
}
// Helper function to format []int32 slice
func formatInt32Slice(slice []int32) string {
if len(slice) == 0 {
return "[]"
}
result := "["
for i, v := range slice {
if i > 0 {
result += ","
}
result += strconv.Itoa(int(v))
}
result += "]"
return result
}
// Helper function to format map[string][]int32 into a readable string
func formatPartitionsMap(partitions map[string][]int32) string {
var result string
for topic, parts := range partitions {
if len(result) > 0 {
result += ", "
}
result += topic + "=" + formatInt32Slice(parts)
}
return result
}

@ -80,7 +80,6 @@ func (t *table) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[
return nil
})
}
// Size returns the total size of the table in bytes.

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -16,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
@ -33,6 +35,20 @@ type Stream struct {
Rows int // Number of rows in the stream.
}
func (s *Stream) Reset() {
s.ID = 0
s.Labels = nil
s.MinTimestamp = time.Time{}
s.MaxTimestamp = time.Time{}
s.Rows = 0
}
var streamPool = sync.Pool{
New: func() interface{} {
return &Stream{}
},
}
// Streams tracks information about streams in a data object.
type Streams struct {
metrics *Metrics
@ -61,10 +77,26 @@ func New(metrics *Metrics, pageSize int) *Streams {
return &Streams{
metrics: metrics,
pageSize: pageSize,
lookup: make(map[uint64][]*Stream),
lookup: make(map[uint64][]*Stream, 1024),
ordered: make([]*Stream, 0, 1024),
}
}
func (s *Streams) Iter() result.Seq[Stream] {
return result.Iter(func(yield func(Stream) bool) error {
for _, stream := range s.ordered {
if !yield(*stream) {
return nil
}
}
return nil
})
}
func (s *Streams) GetBounds() (time.Time, time.Time) {
return s.globalMinTimestamp, s.globalMaxTimestamp
}
// Record a stream record within the Streams section. The provided timestamp is
// used to track the minimum and maximum timestamp of a stream. The number of
// calls to Record is used to track the number of rows for a stream.
@ -153,7 +185,11 @@ func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream {
s.currentLabelsSize += len(lbl.Value)
}
newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels}
newStream := streamPool.Get().(*Stream)
newStream.Reset()
newStream.ID = s.lastID.Add(1)
newStream.Labels = streamLabels
s.lookup[hash] = append(s.lookup[hash], newStream)
s.ordered = append(s.ordered, newStream)
s.metrics.streamCount.Inc()
@ -187,7 +223,6 @@ func (s *Streams) StreamID(streamLabels labels.Labels) int64 {
func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
timer := prometheus.NewTimer(s.metrics.encodeSeconds)
defer timer.ObserveDuration()
defer s.Reset()
// TODO(rfratto): handle one section becoming too large. This can happen when
// the number of columns is very wide. There are two approaches to handle
@ -335,6 +370,9 @@ func encodeColumn(enc *encoding.StreamsEncoder, columnType streamsmd.ColumnType,
// Reset resets all state, allowing Streams to be reused.
func (s *Streams) Reset() {
s.lastID.Store(0)
for _, stream := range s.ordered {
streamPool.Put(stream)
}
clear(s.lookup)
s.ordered = sliceclear.Clear(s.ordered)
s.currentLabelsSize = 0

@ -0,0 +1,175 @@
package metastore
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/logproto"
)
const (
metastoreWindowSize = 12 * time.Hour
)
var (
// Define our own builder config because metastore objects are significantly smaller.
metastoreBuilderCfg = dataobj.BuilderConfig{
SHAPrefixSize: 2,
TargetObjectSize: 32 * 1024 * 1024,
TargetPageSize: 4 * 1024 * 1024,
BufferSize: 32 * 1024 * 1024, // 8x page size
TargetSectionSize: 4 * 1024 * 1024, // object size / 8
}
)
type Manager struct {
metastoreBuilder *dataobj.Builder
tenantID string
metrics *metastoreMetrics
bucket objstore.Bucket
logger log.Logger
backoff *backoff.Backoff
builderOnce sync.Once
}
func NewMetastoreManager(bucket objstore.Bucket, tenantID string, logger log.Logger, reg prometheus.Registerer) (*Manager, error) {
metrics := newMetastoreMetrics()
if err := metrics.register(reg); err != nil {
return nil, err
}
return &Manager{
bucket: bucket,
metrics: metrics,
logger: logger,
tenantID: tenantID,
backoff: backoff.New(context.TODO(), backoff.Config{
MinBackoff: 50 * time.Millisecond,
MaxBackoff: 10 * time.Second,
}),
builderOnce: sync.Once{},
}, nil
}
func (m *Manager) initBuilder() error {
var initErr error
m.builderOnce.Do(func() {
metastoreBuilder, err := dataobj.NewBuilder(metastoreBuilderCfg, m.bucket, m.tenantID)
if err != nil {
initErr = err
return
}
m.metastoreBuilder = metastoreBuilder
})
return initErr
}
func (m *Manager) UpdateMetastore(ctx context.Context, flushResult dataobj.FlushResult) error {
var err error
start := time.Now()
defer m.metrics.observeMetastoreProcessing(start)
// Initialize builder if this is the first call for this partition
if err := m.initBuilder(); err != nil {
return err
}
minTimestamp, maxTimestamp := flushResult.MinTimestamp, flushResult.MaxTimestamp
// Work our way through the metastore objects window by window, updating & creating them as needed.
// Each one handles its own retries in order to keep making progress in the event of a failure.
minMetastoreWindow := minTimestamp.Truncate(metastoreWindowSize)
maxMetastoreWindow := maxTimestamp.Truncate(metastoreWindowSize)
for metastoreWindow := minMetastoreWindow; metastoreWindow.Compare(maxMetastoreWindow) <= 0; metastoreWindow = metastoreWindow.Add(metastoreWindowSize) {
metastorePath := fmt.Sprintf("tenant-%s/metastore/%s.store", m.tenantID, metastoreWindow.Format(time.RFC3339))
m.backoff.Reset()
for m.backoff.Ongoing() {
err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) {
buf, err := io.ReadAll(existing)
if err != nil {
return nil, err
}
m.metastoreBuilder.Reset()
if len(buf) > 0 {
replayStart := time.Now()
object := dataobj.FromReaderAt(bytes.NewReader(buf), int64(len(buf)))
if err := m.readFromExisting(ctx, object); err != nil {
return nil, err
}
m.metrics.observeMetastoreReplay(replayStart)
}
encodingStart := time.Now()
ls := fmt.Sprintf("{__start__=\"%d\", __end__=\"%d\", __path__=\"%s\"}", minTimestamp.UnixNano(), maxTimestamp.UnixNano(), flushResult.Path)
err = m.metastoreBuilder.Append(logproto.Stream{
Labels: ls,
Entries: []logproto.Entry{{Line: ""}},
})
if err != nil {
return nil, err
}
newMetastore, err := m.metastoreBuilder.FlushToBuffer()
if err != nil {
return nil, err
}
m.metrics.observeMetastoreEncoding(encodingStart)
return newMetastore, nil
})
if err == nil {
level.Info(m.logger).Log("msg", "successfully merged & updated metastore", "metastore", metastorePath)
break
}
level.Error(m.logger).Log("msg", "failed to get and replace metastore object", "err", err, "metastore", metastorePath)
m.metrics.incMetastoreWriteFailures()
m.backoff.Wait()
}
// Reset at the end too so we don't leave our memory hanging around between calls.
m.metastoreBuilder.Reset()
}
return err
}
func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object) error {
// Fetch sections
si, err := object.Metadata(ctx)
if err != nil {
return err
}
// Read streams from existing metastore object and write them to the builder for the new object
streams := make([]dataobj.Stream, 100)
for i := 0; i < si.StreamsSections; i++ {
streamsReader := dataobj.NewStreamsReader(object, i)
for n, err := streamsReader.Read(ctx, streams); n > 0; n, err = streamsReader.Read(ctx, streams) {
if err != nil && err != io.EOF {
return err
}
for _, stream := range streams[:n] {
err = m.metastoreBuilder.Append(logproto.Stream{
Labels: stream.Labels.String(),
Entries: []logproto.Entry{{Line: ""}},
})
if err != nil {
return err
}
}
}
}
return nil
}

@ -0,0 +1,106 @@
package metastore
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/dskit/backoff"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
)
func BenchmarkWriteMetastores(t *testing.B) {
ctx := context.Background()
bucket := objstore.NewInMemBucket()
tenantID := "test-tenant"
m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer)
require.NoError(t, err)
// Set limits for the test
m.backoff = backoff.New(context.TODO(), backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
MaxRetries: 3,
})
// Add test data spanning multiple metastore windows
now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC)
flushResults := make([]dataobj.FlushResult, 1000)
for i := 0; i < 1000; i++ {
flushResults[i] = dataobj.FlushResult{
Path: fmt.Sprintf("test-dataobj-path-%d", i),
MinTimestamp: now.Add(-1 * time.Hour).Add(time.Duration(i) * time.Millisecond),
MaxTimestamp: now,
}
}
t.ResetTimer()
t.ReportAllocs()
for i := 0; i < t.N; i++ {
// Test writing metastores
err = m.UpdateMetastore(ctx, flushResults[i%len(flushResults)])
require.NoError(t, err)
}
require.Len(t, bucket.Objects(), 1)
}
func TestWriteMetastores(t *testing.T) {
ctx := context.Background()
bucket := objstore.NewInMemBucket()
tenantID := "test-tenant"
m, err := NewMetastoreManager(bucket, tenantID, log.NewNopLogger(), prometheus.DefaultRegisterer)
require.NoError(t, err)
// Set limits for the test
m.backoff = backoff.New(context.TODO(), backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
MaxRetries: 3,
})
// Add test data spanning multiple metastore windows
now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC)
flushResult := dataobj.FlushResult{
Path: "test-dataobj-path",
MinTimestamp: now.Add(-1 * time.Hour),
MaxTimestamp: now,
}
require.Len(t, bucket.Objects(), 0)
// Test writing metastores
err = m.UpdateMetastore(ctx, flushResult)
require.NoError(t, err)
require.Len(t, bucket.Objects(), 1)
var originalSize int
for _, obj := range bucket.Objects() {
originalSize = len(obj)
}
flushResult2 := dataobj.FlushResult{
Path: "different-test-dataobj-path",
MinTimestamp: now.Add(-15 * time.Minute),
MaxTimestamp: now,
}
err = m.UpdateMetastore(ctx, flushResult2)
require.NoError(t, err)
require.Len(t, bucket.Objects(), 1)
for _, obj := range bucket.Objects() {
require.Greater(t, len(obj), originalSize)
}
}

@ -0,0 +1,102 @@
package metastore
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type metastoreMetrics struct {
metastoreProcessingTime prometheus.Histogram
metastoreReplayTime prometheus.Histogram
metastoreEncodingTime prometheus.Histogram
metastoreWriteFailures prometheus.Counter
}
func newMetastoreMetrics() *metastoreMetrics {
metrics := &metastoreMetrics{
metastoreReplayTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_dataobj_consumer_metastore_replay_seconds",
Help: "Time taken to replay existing metastore data into the in-memory builder in seconds",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
metastoreEncodingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_dataobj_consumer_metastore_encoding_seconds",
Help: "Time taken to add the new metadata & encode the new metastore data object in seconds",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
metastoreProcessingTime: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_dataobj_consumer_metastore_processing_seconds",
Help: "Total time taken to update all metastores for a flushed dataobj in seconds",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
metastoreWriteFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_metastore_write_failures_total",
Help: "Total number of metastore write failures",
}),
}
return metrics
}
func (p *metastoreMetrics) register(reg prometheus.Registerer) error {
collectors := []prometheus.Collector{
p.metastoreReplayTime,
p.metastoreEncodingTime,
p.metastoreProcessingTime,
p.metastoreWriteFailures,
}
for _, collector := range collectors {
if err := reg.Register(collector); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
return err
}
}
}
return nil
}
func (p *metastoreMetrics) unregister(reg prometheus.Registerer) {
collectors := []prometheus.Collector{
p.metastoreReplayTime,
p.metastoreEncodingTime,
p.metastoreProcessingTime,
p.metastoreWriteFailures,
}
for _, collector := range collectors {
reg.Unregister(collector)
}
}
func (p *metastoreMetrics) incMetastoreWriteFailures() {
p.metastoreWriteFailures.Inc()
}
func (p *metastoreMetrics) observeMetastoreReplay(recordTimestamp time.Time) {
if !recordTimestamp.IsZero() { // Only observe if timestamp is valid
p.metastoreReplayTime.Observe(time.Since(recordTimestamp).Seconds())
}
}
func (p *metastoreMetrics) observeMetastoreEncoding(recordTimestamp time.Time) {
if !recordTimestamp.IsZero() { // Only observe if timestamp is valid
p.metastoreEncodingTime.Observe(time.Since(recordTimestamp).Seconds())
}
}
func (p *metastoreMetrics) observeMetastoreProcessing(recordTimestamp time.Time) {
if !recordTimestamp.IsZero() { // Only observe if timestamp is valid
p.metastoreProcessingTime.Observe(time.Since(recordTimestamp).Seconds())
}
}

@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/compactor"
compactorclient "github.com/grafana/loki/v3/pkg/compactor/client"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/explorer"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
@ -109,6 +110,7 @@ type Config struct {
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"`
DataObjConsumer consumer.Config `yaml:"dataobj_consumer,omitempty" category:"experimental"`
DataObjExplorer explorer.Config `yaml:"dataobj_explorer,omitempty" category:"experimental"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
@ -192,6 +194,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.BlockBuilder.RegisterFlags(f)
c.BlockScheduler.RegisterFlags(f)
c.DataObjExplorer.RegisterFlags(f)
c.DataObjConsumer.RegisterFlags(f)
}
func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
@ -307,6 +310,9 @@ func (c *Config) Validate() error {
if err := c.KafkaConfig.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid kafka_config config"))
}
if err := c.DataObjConsumer.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid dataobj_consumer config"))
}
}
if err := c.Distributor.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid distributor config"))
@ -390,6 +396,7 @@ type Loki struct {
partitionRing *ring.PartitionInstanceRing
blockBuilder *blockbuilder.BlockBuilder
blockScheduler *blockscheduler.BlockScheduler
dataObjConsumer *consumer.Service
ClientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
@ -707,6 +714,8 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockScheduler, t.initBlockScheduler)
mm.RegisterModule(DataObjExplorer, t.initDataObjExplorer)
mm.RegisterModule(DataObjConsumer, t.initDataObjConsumer)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
mm.RegisterModule(Write, nil)
@ -746,6 +755,7 @@ func (t *Loki) setupModuleManager() error {
BlockBuilder: {PartitionRing, Store, Server},
BlockScheduler: {Server},
DataObjExplorer: {Server},
DataObjConsumer: {PartitionRing, Server},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},

@ -51,6 +51,7 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/compactor/generationnumber"
"github.com/grafana/loki/v3/pkg/dataobj/consumer"
"github.com/grafana/loki/v3/pkg/dataobj/explorer"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
@ -144,11 +145,11 @@ const (
BlockBuilder = "block-builder"
BlockScheduler = "block-scheduler"
DataObjExplorer = "dataobj-explorer"
All = "all"
Read = "read"
Write = "write"
Backend = "backend"
DataObjConsumer = "dataobj-consumer"
All = "all"
Read = "read"
Write = "write"
Backend = "backend"
)
const (
@ -1905,6 +1906,33 @@ func (t *Loki) initDataObjExplorer() (services.Service, error) {
return explorer, nil
}
func (t *Loki) initDataObjConsumer() (services.Service, error) {
if !t.Cfg.Ingester.KafkaIngestion.Enabled {
return nil, nil
}
schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now())
if err != nil {
return nil, fmt.Errorf("failed to get schema for now: %w", err)
}
store, err := bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, "dataobj", util_log.Logger)
if err != nil {
return nil, err
}
level.Info(util_log.Logger).Log("msg", "initializing dataobj consumer", "instance", t.Cfg.Ingester.LifecyclerConfig.ID)
t.dataObjConsumer = consumer.New(
t.Cfg.KafkaConfig,
t.Cfg.DataObjConsumer,
store,
t.Cfg.Ingester.LifecyclerConfig.ID,
t.partitionRing,
prometheus.DefaultRegisterer,
util_log.Logger,
)
return t.dataObjConsumer, nil
}
func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil

@ -36,6 +36,11 @@ func (b *PrefixedBucketClient) Upload(ctx context.Context, name string, r io.Rea
return
}
// GetAndReplace is a helper function that gets an object from the bucket and replaces it with a new reader.
func (b *PrefixedBucketClient) GetAndReplace(ctx context.Context, name string, fn func(existing io.Reader) (io.Reader, error)) error {
return b.bucket.GetAndReplace(ctx, b.fullName(name), fn)
}
// Delete removes the object with the given name.
func (b *PrefixedBucketClient) Delete(ctx context.Context, name string) error {
return b.bucket.Delete(ctx, b.fullName(name))

@ -59,6 +59,10 @@ func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader)
return b.bucket.Upload(ctx, name, r)
}
func (b *SSEBucketClient) GetAndReplace(ctx context.Context, name string, fn func(existing io.Reader) (io.Reader, error)) error {
return b.bucket.GetAndReplace(ctx, name, fn)
}
// Delete implements objstore.Bucket.
func (b *SSEBucketClient) Delete(ctx context.Context, name string) error {
return b.bucket.Delete(ctx, name)

@ -193,6 +193,32 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64
}, nil
}
func (b *InMemBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
reader, err := b.Get(ctx, name)
if err != nil && !errors.Is(err, errNotFound) {
return err
}
b.mtx.Lock()
defer b.mtx.Unlock()
if reader == nil {
reader = io.NopCloser(bytes.NewReader(nil))
}
new, err := f(reader)
if err != nil {
return err
}
newObj, err := io.ReadAll(new)
if err != nil {
return err
}
b.objects[name] = newObj
return nil
}
// Exists checks if the given directory exists in memory.
func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error) {
b.mtx.RLock()

@ -64,6 +64,10 @@ type Bucket interface {
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader) error
// GetAndReplace an existing object with a new object
// If the previous object is created or updated before the new object is uploaded, then the call will fail with an error.
GetAndReplace(ctx context.Context, name string, f func(existing io.Reader) (io.Reader, error)) error
// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error
@ -731,6 +735,10 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
), nil
}
func (b *metricBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
return b.bkt.GetAndReplace(ctx, name, f)
}
func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
const op = OpExists
b.metrics.ops.WithLabelValues(op).Inc()

@ -79,6 +79,10 @@ func (p *PrefixedBucket) GetRange(ctx context.Context, name string, off int64, l
return p.bkt.GetRange(ctx, conditionalPrefix(p.prefix, name), off, length)
}
func (b *PrefixedBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
return b.bkt.GetAndReplace(ctx, conditionalPrefix(b.prefix, name), f)
}
// Exists checks if the given object exists in the bucket.
func (p *PrefixedBucket) Exists(ctx context.Context, name string) (bool, error) {
return p.bkt.Exists(ctx, conditionalPrefix(p.prefix, name))

@ -429,3 +429,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err
func (b *Bucket) Close() error {
return nil
}
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
panic("unimplemented: Azure.GetAndReplace")
}

@ -440,3 +440,7 @@ func validateForTest(conf Config) error {
}
return nil
}
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
panic("unimplemented: BOS.GetAndReplace")
}

@ -11,6 +11,7 @@ import (
"path/filepath"
"github.com/efficientgo/core/errcapture"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
@ -269,6 +270,41 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err erro
return nil
}
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
file := filepath.Join(b.rootDir, name)
// Acquire a file lock before modifiying as file-systems don't support conditional writes like cloud providers.
fileLock := flock.New(file + ".lock")
locked, err := fileLock.TryLock()
if err != nil {
return err
}
if !locked {
return errors.New("file is locked by another process")
}
defer fileLock.Unlock()
var r io.ReadCloser
r, err = os.Open(file)
if err != nil && !os.IsNotExist(err) {
return err
} else if err == nil {
defer r.Close()
}
newContent, err := f(r)
if err != nil {
return err
}
content, err := io.ReadAll(newContent)
if err != nil {
return err
}
return os.WriteFile(file, content, 0600)
}
func isDirEmpty(name string) (ok bool, err error) {
f, err := os.Open(filepath.Clean(name))
if os.IsNotExist(err) {

@ -5,6 +5,7 @@
package gcs
import (
"bytes"
"context"
"fmt"
"io"
@ -37,6 +38,8 @@ var DefaultConfig = Config{
HTTPConfig: exthttp.DefaultHTTPConfig,
}
var _ objstore.Bucket = &Bucket{}
// Config stores the configuration for gcs bucket.
type Config struct {
Bucket string `yaml:"bucket"`
@ -273,7 +276,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
r, err := b.bkt.Object(name).NewReader(ctx)
r, err := b.get(ctx, name)
if err != nil {
return r, err
}
@ -286,6 +289,10 @@ func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
}, nil
}
func (b *Bucket) get(ctx context.Context, name string) (*storage.Reader, error) {
return b.bkt.Object(name).NewReader(ctx)
}
// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
r, err := b.bkt.Object(name).NewRangeReader(ctx, off, length)
@ -333,7 +340,21 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
w := b.bkt.Object(name).NewWriter(ctx)
return b.upload(ctx, name, r, 0, false)
}
// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) upload(ctx context.Context, name string, r io.Reader, generation int64, requireNewObject bool) error {
o := b.bkt.Object(name)
var w *storage.Writer
if generation != 0 {
o = o.If(storage.Conditions{GenerationMatch: generation})
}
if requireNewObject {
o = o.If(storage.Conditions{DoesNotExist: true})
}
w = o.NewWriter(ctx)
// if `chunkSize` is 0, we don't set any custom value for writer's ChunkSize.
// It uses whatever the default value https://pkg.go.dev/google.golang.org/cloud/storage#Writer
@ -347,6 +368,41 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return w.Close()
}
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
var mustNotExist bool
var generation int64
// Get the current object
storageReader, err := b.get(ctx, name)
if err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
return err
} else if errors.Is(err, storage.ErrObjectNotExist) {
mustNotExist = true
}
// If object exists, ensure we close the reader when done
if storageReader != nil {
generation = storageReader.Attrs.Generation
defer storageReader.Close()
}
newContent, err := f(wrapReader(storageReader))
if err != nil {
return err
}
// Upload with the previous generation, or mustNotExist for new objects
return b.upload(ctx, name, newContent, generation, mustNotExist)
}
func wrapReader(r *storage.Reader) io.ReadCloser {
if r == nil {
return io.NopCloser(bytes.NewReader(nil))
}
return r
}
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.bkt.Object(name).Delete(ctx)

@ -426,3 +426,7 @@ func (b *Bucket) IsAccessDeniedErr(err error) bool {
}
return false
}
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
panic("unimplemented: OSS.GetAndReplace")
}

@ -458,7 +458,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
}, filteredOpts...)
}
func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (*minio.Object, error) {
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return nil, err
@ -488,6 +488,16 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
return r, nil
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
r, err := b.getRange(ctx, name, 0, -1)
if err != nil {
return r, err
}
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
@ -501,14 +511,24 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
}, nil
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, name, 0, -1)
}
// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, name, off, length)
r, err := b.getRange(ctx, name, off, length)
if err != nil {
return r, err
}
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
stat, err := r.Stat()
if err != nil {
return 0, err
}
return stat.Size, nil
},
}, nil
}
// Exists checks if the given object exists.
@ -526,6 +546,10 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.upload(ctx, name, r, "", false)
}
func (b *Bucket) upload(ctx context.Context, name string, r io.Reader, etag string, requireNewObject bool) error {
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return err
@ -549,24 +573,33 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
userMetadata[k] = v
}
putOpts := minio.PutObjectOptions{
DisableMultipart: b.disableMultipart,
PartSize: partSize,
ServerSideEncryption: sse,
UserMetadata: userMetadata,
StorageClass: b.storageClass,
SendContentMd5: b.sendContentMd5,
// 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we
// ensure we pin this number to four.
// TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck.
NumThreads: 4,
}
if etag != "" {
if requireNewObject {
putOpts.SetMatchETagExcept(etag)
} else {
putOpts.SetMatchETag(etag)
}
}
if _, err := b.client.PutObject(
ctx,
b.name,
name,
r,
size,
minio.PutObjectOptions{
DisableMultipart: b.disableMultipart,
PartSize: partSize,
ServerSideEncryption: sse,
UserMetadata: userMetadata,
StorageClass: b.storageClass,
SendContentMd5: b.sendContentMd5,
// 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we
// ensure we pin this number to four.
// TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck.
NumThreads: 4,
},
putOpts,
); err != nil {
return errors.Wrap(err, "upload s3 object")
}
@ -574,6 +607,30 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return nil
}
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
var requireNewObject bool
originalContent, err := b.getRange(ctx, name, 0, -1)
if err != nil && !b.IsObjNotFoundErr(err) {
return err
} else if b.IsObjNotFoundErr(err) {
requireNewObject = true
}
// Call work function to get a new version of the file
newContent, err := f(originalContent)
if err != nil {
return err
}
stats, err := originalContent.Stat()
if err != nil {
return err
}
return b.upload(ctx, name, newContent, stats.ETag, requireNewObject)
}
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
objInfo, err := b.client.StatObject(ctx, b.name, name, minio.StatObjectOptions{})

@ -375,6 +375,10 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err
return nil
}
func (b *Container) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
panic("unimplemented: Swift.GetAndReplace")
}
// Delete removes the object with the given name.
func (c *Container) Delete(_ context.Context, name string) error {
return errors.Wrap(c.connection.LargeObjectDelete(c.name, name), "delete object")

@ -287,6 +287,10 @@ func (d *delayingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return d.bkt.Get(ctx, name)
}
func (b *delayingBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) error {
panic("unimplemented: delayingBucket.GetAndReplace")
}
func (d *delayingBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
time.Sleep(d.delay)
return d.bkt.Attributes(ctx, name)

@ -118,6 +118,14 @@ func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (er
return
}
func (t TracingBucket) GetAndReplace(ctx context.Context, name string, f func(io.Reader) (io.Reader, error)) (err error) {
doWithSpan(ctx, "bucket_get_and_replace", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)
err = t.bkt.GetAndReplace(spanCtx, name, f)
})
return
}
func (t TracingBucket) Delete(ctx context.Context, name string) (err error) {
doWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)

@ -1633,7 +1633,7 @@ github.com/stretchr/testify/assert/yaml
github.com/stretchr/testify/mock
github.com/stretchr/testify/require
github.com/stretchr/testify/suite
# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a
# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866
## explicit; go 1.22
github.com/thanos-io/objstore
github.com/thanos-io/objstore/clientutil
@ -2562,3 +2562,4 @@ sigs.k8s.io/yaml/goyaml.v2
# github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
# github.com/grafana/loki/pkg/push => ./pkg/push
# github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0
# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250128154815-d7e99f81f866

Loading…
Cancel
Save