chore(dataobj): add initial high-level APIs for reading streams and log records (#15974)

pull/15972/head
Robert Fratto 4 months ago committed by GitHub
parent a8bd3a88bf
commit 1adb1e39d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 363
      pkg/dataobj/builder.go
  2. 63
      pkg/dataobj/builder_test.go
  3. 367
      pkg/dataobj/dataobj.go
  4. 4
      pkg/dataobj/internal/sections/logs/iter.go
  5. 4
      pkg/dataobj/internal/sections/streams/iter.go
  6. 294
      pkg/dataobj/logs_reader.go
  7. 183
      pkg/dataobj/logs_reader_test.go
  8. 111
      pkg/dataobj/reader.go
  9. 248
      pkg/dataobj/streams_reader.go
  10. 131
      pkg/dataobj/streams_reader_test.go

@ -0,0 +1,363 @@
package dataobj
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"fmt"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
// ErrBufferFull is returned by [Builder.Append] when the buffer is full and
// needs to flush; call [Builder.Flush] to flush it.
var ErrBufferFull = errors.New("buffer full")
// BuilderConfig configures a data object [Builder].
type BuilderConfig struct {
// SHAPrefixSize sets the number of bytes of the SHA filename to use as a
// folder path.
SHAPrefixSize int `yaml:"sha_prefix_size"`
// TargetPageSize configures a target size for encoded pages within the data
// object. TargetPageSize accounts for encoding, but not for compression.
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
// TODO(rfratto): We need an additional parameter for TargetMetadataSize, as
// metadata payloads can't be split and must be downloaded in a single
// request.
//
// At the moment, we don't have a good mechanism for implementing a metadata
// size limit (we need to support some form of section splitting or column
// combinations), so the option is omitted for now.
// TargetObjectSize configures a target size for data objects.
TargetObjectSize flagext.Bytes `yaml:"target_object_size"`
// TargetSectionSize configures the maximum size of data in a section. Sections
// which support this parameter will place overflow data into new sections of
// the same type.
TargetSectionSize flagext.Bytes `yaml:"target_section_size"`
// BufferSize configures the size of the buffer used to accumulate
// uncompressed logs in memory prior to sorting.
BufferSize flagext.Bytes `yaml:"buffer_size"`
}
// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
_ = cfg.TargetPageSize.Set("2MB")
_ = cfg.TargetObjectSize.Set("1GB")
_ = cfg.BufferSize.Set("16MB") // Page Size * 8
_ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8
f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.")
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.")
f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.")
}
// Validate validates the BuilderConfig.
func (cfg *BuilderConfig) Validate() error {
var errs []error
if cfg.SHAPrefixSize <= 0 {
errs = append(errs, errors.New("SHAPrefixSize must be greater than 0"))
}
if cfg.TargetPageSize <= 0 {
errs = append(errs, errors.New("TargetPageSize must be greater than 0"))
} else if cfg.TargetPageSize >= cfg.TargetObjectSize {
errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize"))
}
if cfg.TargetObjectSize <= 0 {
errs = append(errs, errors.New("TargetObjectSize must be greater than 0"))
}
if cfg.BufferSize <= 0 {
errs = append(errs, errors.New("BufferSize must be greater than 0"))
}
if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize {
errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize"))
}
return errors.Join(errs...)
}
// A Builder builds data objects from a set of incoming log data. Log data is
// appended to a builder by calling [Builder.Append]. Buffered log data is
// flushed manually by calling [Builder.Flush].
//
// Methods on Builder are not goroutine-safe; callers are responsible for
// synchronizing calls.
type Builder struct {
cfg BuilderConfig
metrics *metrics
bucket objstore.Bucket
tenantID string
labelCache *lru.Cache[string, labels.Labels]
currentSizeEstimate int
state builderState
streams *streams.Streams
logs *logs.Logs
flushBuffer *bytes.Buffer
encoder *encoding.Encoder
}
type builderState int
const (
// builderStateReady indicates the builder is empty and ready to accept new data.
builderStateEmpty builderState = iota
// builderStateDirty indicates the builder has been modified since the last flush.
builderStateDirty
// builderStateFlushing indicates the builder has data to flush.
builderStateFlush
)
// NewBuilder creates a new Builder which stores data objects for the specified
// tenant in a bucket.
//
// NewBuilder returns an error if BuilderConfig is invalid.
func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
labelCache, err := lru.New[string, labels.Labels](5000)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
var (
metrics = newMetrics()
flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize)))
encoder = encoding.NewEncoder(flushBuffer)
)
metrics.ObserveConfig(cfg)
return &Builder{
cfg: cfg,
metrics: metrics,
bucket: bucket,
tenantID: tenantID,
labelCache: labelCache,
streams: streams.New(metrics.streams, int(cfg.TargetPageSize)),
logs: logs.New(metrics.logs, logs.Options{
PageSizeHint: int(cfg.TargetPageSize),
BufferSize: int(cfg.BufferSize),
SectionSize: int(cfg.TargetSectionSize),
}),
flushBuffer: flushBuffer,
encoder: encoder,
}, nil
}
// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBufferFull] if the
// builder is full.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) Append(stream logproto.Stream) error {
// Don't allow appending to a builder that has data to be flushed.
if b.state == builderStateFlush {
return ErrBufferFull
}
ls, err := b.parseLabels(stream.Labels)
if err != nil {
return err
}
// Check whether the buffer is full before a stream can be appended; this is
// tends to overestimate, but we may still go over our target size.
//
// Since this check only happens after the first call to Append,
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) {
return ErrBufferFull
}
timer := prometheus.NewTimer(b.metrics.appendTime)
defer timer.ObserveDuration()
for _, entry := range stream.Entries {
streamID := b.streams.Record(ls, entry.Timestamp)
b.logs.Append(logs.Record{
StreamID: streamID,
Timestamp: entry.Timestamp,
Metadata: entry.StructuredMetadata,
Line: entry.Line,
})
}
b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
return nil
}
func (b *Builder) parseLabels(labelString string) (labels.Labels, error) {
labels, ok := b.labelCache.Get(labelString)
if ok {
return labels, nil
}
labels, err := syntax.ParseLabels(labelString)
if err != nil {
return nil, fmt.Errorf("failed to parse labels: %w", err)
}
b.labelCache.Add(labelString, labels)
return labels, nil
}
func (b *Builder) estimatedSize() int {
var size int
size += b.streams.EstimatedSize()
size += b.logs.EstimatedSize()
b.metrics.sizeEstimate.Set(float64(size))
return size
}
// labelsEstimate estimates the size of a set of labels in bytes.
func labelsEstimate(ls labels.Labels) int {
var (
keysSize int
valuesSize int
)
for _, l := range ls {
keysSize += len(l.Name)
valuesSize += len(l.Value)
}
// Keys are stored as columns directly, while values get compressed. We'll
// underestimate a 2x compression ratio.
return keysSize + valuesSize/2
}
// streamSizeEstimate estimates the size of a stream in bytes.
func streamSizeEstimate(stream logproto.Stream) int {
var size int
for _, entry := range stream.Entries {
// We only check the size of the line and metadata. Timestamps and IDs
// encode so well that they're unlikely to make a singificant impact on our
// size estimate.
size += len(entry.Line) / 2 // Line with 2x compression ratio
for _, md := range entry.StructuredMetadata {
size += len(md.Name) + len(md.Value)/2
}
}
return size
}
// Flush flushes all buffered data to object storage. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
func (b *Builder) Flush(ctx context.Context) error {
switch b.state {
case builderStateEmpty:
return nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
}
timer := prometheus.NewTimer(b.metrics.flushTime)
defer timer.ObserveDuration()
sum := sha256.Sum224(b.flushBuffer.Bytes())
sumStr := hex.EncodeToString(sum[:])
objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:])
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil {
return err
}
b.Reset()
return nil
}
func (b *Builder) buildObject() error {
timer := prometheus.NewTimer(b.metrics.buildTime)
defer timer.ObserveDuration()
// We reset after a successful flush, but we also reset the buffer before
// building for safety.
b.flushBuffer.Reset()
if err := b.streams.EncodeTo(b.encoder); err != nil {
return fmt.Errorf("encoding streams: %w", err)
} else if err := b.logs.EncodeTo(b.encoder); err != nil {
return fmt.Errorf("encoding logs: %w", err)
} else if err := b.encoder.Flush(); err != nil {
return fmt.Errorf("encoding object: %w", err)
}
b.metrics.builtSize.Observe(float64(b.flushBuffer.Len()))
// We pass context.Background() below to avoid allowing building an object to
// time out; timing out on build would discard anything we built and would
// cause data loss.
dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len()))
return b.metrics.encoding.Observe(context.Background(), dec)
}
// Reset discards pending data and resets the builder to an empty state.
func (b *Builder) Reset() {
b.logs.Reset()
b.streams.Reset()
b.state = builderStateEmpty
b.flushBuffer.Reset()
b.metrics.sizeEstimate.Set(0)
}
// RegisterMetrics registers metrics about builder to report to reg. All
// metrics will have a tenant label set to the tenant ID of the Builder.
//
// If multiple Builders for the same tenant are running in the same process,
// reg must contain additional labels to differentiate between them.
func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)
return b.metrics.Register(reg)
}
// UnregisterMetrics unregisters metrics about builder from reg.
func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)
b.metrics.Unregister(reg)
}

@ -1,10 +1,9 @@
package dataobj
import (
"cmp"
"context"
"errors"
"slices"
"fmt"
"strings"
"testing"
"time"
@ -16,7 +15,6 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
var testBuilderConfig = BuilderConfig{
@ -29,7 +27,7 @@ var testBuilderConfig = BuilderConfig{
BufferSize: 2048 * 8,
}
func Test(t *testing.T) {
func TestBuilder(t *testing.T) {
bucket := objstore.NewInMemBucket()
streams := []logproto.Stream{
@ -87,21 +85,21 @@ func Test(t *testing.T) {
})
t.Run("Read", func(t *testing.T) {
reader := newReader(bucket)
objects, err := result.Collect(reader.Objects(context.Background(), "fake"))
objects, err := result.Collect(listObjects(context.Background(), bucket, "fake"))
require.NoError(t, err)
require.Len(t, objects, 1)
actual, err := result.Collect(reader.Streams(context.Background(), objects[0]))
obj := FromBucket(bucket, objects[0])
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, sortStreams(t, streams), actual)
require.Equal(t, 1, md.StreamsSections)
require.Equal(t, 1, md.LogsSections)
})
}
// Test_Builder_Append ensures that appending to the buffer eventually reports
// TestBuilder_Append ensures that appending to the buffer eventually reports
// that the buffer is full.
func Test_Builder_Append(t *testing.T) {
func TestBuilder_Append(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
@ -127,43 +125,24 @@ func Test_Builder_Append(t *testing.T) {
}
}
// sortStreams returns a new slice of streams where entries in individual
// streams are sorted by timestamp and structured metadata are sorted by key.
// The order of streams is preserved.
func sortStreams(t *testing.T, streams []logproto.Stream) []logproto.Stream {
t.Helper()
func listObjects(ctx context.Context, bucket objstore.Bucket, tenant string) result.Seq[string] {
tenantPath := fmt.Sprintf("tenant-%s/objects/", tenant)
res := make([]logproto.Stream, len(streams))
for i, in := range streams {
labels, err := syntax.ParseLabels(in.Labels)
require.NoError(t, err)
return result.Iter(func(yield func(string) bool) error {
errIterationStopped := errors.New("iteration stopped")
res[i] = logproto.Stream{
Labels: labels.String(),
Entries: slices.Clone(in.Entries),
Hash: labels.Hash(),
}
for j, ent := range res[i].Entries {
res[i].Entries[j].StructuredMetadata = slices.Clone(ent.StructuredMetadata)
slices.SortFunc(res[i].Entries[j].StructuredMetadata, func(i, j push.LabelAdapter) int {
return cmp.Compare(i.Name, j.Name)
})
err := bucket.Iter(ctx, tenantPath, func(name string) error {
if !yield(name) {
return errIterationStopped
}
return nil
}, objstore.WithRecursiveIter())
slices.SortFunc(res[i].Entries, func(i, j push.Entry) int {
switch {
case i.Timestamp.Before(j.Timestamp):
return -1
case i.Timestamp.After(j.Timestamp):
return 1
case errors.Is(err, errIterationStopped):
return nil
default:
return 0
return err
}
})
}
return res
}

@ -2,363 +2,54 @@
package dataobj
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"fmt"
"io"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
)
// ErrBufferFull is returned by [Builder.Append] when the buffer is full and
// needs to flush; call [Builder.Flush] to flush it.
var ErrBufferFull = errors.New("buffer full")
// BuilderConfig configures a data object [Builder].
type BuilderConfig struct {
// SHAPrefixSize sets the number of bytes of the SHA filename to use as a
// folder path.
SHAPrefixSize int `yaml:"sha_prefix_size"`
// TargetPageSize configures a target size for encoded pages within the data
// object. TargetPageSize accounts for encoding, but not for compression.
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
// TODO(rfratto): We need an additional parameter for TargetMetadataSize, as
// metadata payloads can't be split and must be downloaded in a single
// request.
//
// At the moment, we don't have a good mechanism for implementing a metadata
// size limit (we need to support some form of section splitting or column
// combinations), so the option is omitted for now.
// TargetObjectSize configures a target size for data objects.
TargetObjectSize flagext.Bytes `yaml:"target_object_size"`
// TargetSectionSize configures the maximum size of data in a section. Sections
// which support this parameter will place overflow data into new sections of
// the same type.
TargetSectionSize flagext.Bytes `yaml:"target_section_size"`
// BufferSize configures the size of the buffer used to accumulate
// uncompressed logs in memory prior to sorting.
BufferSize flagext.Bytes `yaml:"buffer_size"`
}
// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
_ = cfg.TargetPageSize.Set("2MB")
_ = cfg.TargetObjectSize.Set("1GB")
_ = cfg.BufferSize.Set("16MB") // Page Size * 8
_ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8
f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.")
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.")
f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.")
}
// Validate validates the BuilderConfig.
func (cfg *BuilderConfig) Validate() error {
var errs []error
if cfg.SHAPrefixSize <= 0 {
errs = append(errs, errors.New("SHAPrefixSize must be greater than 0"))
}
if cfg.TargetPageSize <= 0 {
errs = append(errs, errors.New("TargetPageSize must be greater than 0"))
} else if cfg.TargetPageSize >= cfg.TargetObjectSize {
errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize"))
// An Object is a representation of a data object.
type Object struct {
dec encoding.Decoder
}
if cfg.TargetObjectSize <= 0 {
errs = append(errs, errors.New("TargetObjectSize must be greater than 0"))
// FromBucket opens an Object from the given storage bucket and path.
func FromBucket(bucket objstore.Bucket, path string) *Object {
return &Object{dec: encoding.BucketDecoder(bucket, path)}
}
if cfg.BufferSize <= 0 {
errs = append(errs, errors.New("BufferSize must be greater than 0"))
}
if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize {
errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize"))
}
return errors.Join(errs...)
}
// A Builder builds data objects from a set of incoming log data. Log data is
// appended to a builder by calling [Builder.Append]. Buffered log data is
// flushed manually by calling [Builder.Flush].
//
// Methods on Builder are not goroutine-safe; callers are responsible for
// synchronizing calls.
type Builder struct {
cfg BuilderConfig
metrics *metrics
bucket objstore.Bucket
tenantID string
labelCache *lru.Cache[string, labels.Labels]
currentSizeEstimate int
state builderState
streams *streams.Streams
logs *logs.Logs
flushBuffer *bytes.Buffer
encoder *encoding.Encoder
// FromReadSeeker opens an Object from the given ReaderAt. The size argument
// specifies the size of the data object in bytes.
func FromReaderAt(r io.ReaderAt, size int64) *Object {
return &Object{dec: encoding.ReaderAtDecoder(r, size)}
}
type builderState int
const (
// builderStateReady indicates the builder is empty and ready to accept new data.
builderStateEmpty builderState = iota
// builderStateDirty indicates the builder has been modified since the last flush.
builderStateDirty
// builderStateFlushing indicates the builder has data to flush.
builderStateFlush
)
// NewBuilder creates a new Builder which stores data objects for the specified
// tenant in a bucket.
//
// NewBuilder returns an error if BuilderConfig is invalid.
func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) {
if err := cfg.Validate(); err != nil {
return nil, err
// Metadata holds high-level metadata about an [Object].
type Metadata struct {
StreamsSections int // Number of streams sections in the Object.
LogsSections int // Number of logs sections in the Object.
}
labelCache, err := lru.New[string, labels.Labels](5000)
// Metadata returns the metadata of the Object. Metadata returns an error if
// the object cannot be read.
func (o *Object) Metadata(ctx context.Context) (Metadata, error) {
si, err := o.dec.Sections(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
var (
metrics = newMetrics()
flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize)))
encoder = encoding.NewEncoder(flushBuffer)
)
metrics.ObserveConfig(cfg)
return &Builder{
cfg: cfg,
metrics: metrics,
bucket: bucket,
tenantID: tenantID,
labelCache: labelCache,
streams: streams.New(metrics.streams, int(cfg.TargetPageSize)),
logs: logs.New(metrics.logs, logs.Options{
PageSizeHint: int(cfg.TargetPageSize),
BufferSize: int(cfg.BufferSize),
SectionSize: int(cfg.TargetSectionSize),
}),
flushBuffer: flushBuffer,
encoder: encoder,
}, nil
return Metadata{}, fmt.Errorf("reading sections: %w", err)
}
// Append buffers a stream to be written to a data object. Append returns an
// error if the stream labels cannot be parsed or [ErrBufferFull] if the
// builder is full.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) Append(stream logproto.Stream) error {
// Don't allow appending to a builder that has data to be flushed.
if b.state == builderStateFlush {
return ErrBufferFull
var md Metadata
for _, s := range si {
switch s.Type {
case filemd.SECTION_TYPE_STREAMS:
md.StreamsSections++
case filemd.SECTION_TYPE_LOGS:
md.LogsSections++
}
ls, err := b.parseLabels(stream.Labels)
if err != nil {
return err
}
// Check whether the buffer is full before a stream can be appended; this is
// tends to overestimate, but we may still go over our target size.
//
// Since this check only happens after the first call to Append,
// b.currentSizeEstimate will always be updated to reflect the size following
// the previous append.
if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) {
return ErrBufferFull
}
timer := prometheus.NewTimer(b.metrics.appendTime)
defer timer.ObserveDuration()
for _, entry := range stream.Entries {
streamID := b.streams.Record(ls, entry.Timestamp)
b.logs.Append(logs.Record{
StreamID: streamID,
Timestamp: entry.Timestamp,
Metadata: entry.StructuredMetadata,
Line: entry.Line,
})
}
b.currentSizeEstimate = b.estimatedSize()
b.state = builderStateDirty
return nil
}
func (b *Builder) parseLabels(labelString string) (labels.Labels, error) {
labels, ok := b.labelCache.Get(labelString)
if ok {
return labels, nil
}
labels, err := syntax.ParseLabels(labelString)
if err != nil {
return nil, fmt.Errorf("failed to parse labels: %w", err)
}
b.labelCache.Add(labelString, labels)
return labels, nil
}
func (b *Builder) estimatedSize() int {
var size int
size += b.streams.EstimatedSize()
size += b.logs.EstimatedSize()
b.metrics.sizeEstimate.Set(float64(size))
return size
}
// labelsEstimate estimates the size of a set of labels in bytes.
func labelsEstimate(ls labels.Labels) int {
var (
keysSize int
valuesSize int
)
for _, l := range ls {
keysSize += len(l.Name)
valuesSize += len(l.Value)
}
// Keys are stored as columns directly, while values get compressed. We'll
// underestimate a 2x compression ratio.
return keysSize + valuesSize/2
}
// streamSizeEstimate estimates the size of a stream in bytes.
func streamSizeEstimate(stream logproto.Stream) int {
var size int
for _, entry := range stream.Entries {
// We only check the size of the line and metadata. Timestamps and IDs
// encode so well that they're unlikely to make a singificant impact on our
// size estimate.
size += len(entry.Line) / 2 // Line with 2x compression ratio
for _, md := range entry.StructuredMetadata {
size += len(md.Name) + len(md.Value)/2
}
}
return size
}
// Flush flushes all buffered data to object storage. Calling Flush can result
// in a no-op if there is no buffered data to flush.
//
// If Flush builds an object but fails to upload it to object storage, the
// built object is cached and can be retried. [Builder.Reset] can be called to
// discard any pending data and allow new data to be appended.
func (b *Builder) Flush(ctx context.Context) error {
switch b.state {
case builderStateEmpty:
return nil // Nothing to flush
case builderStateDirty:
if err := b.buildObject(); err != nil {
return fmt.Errorf("building object: %w", err)
}
b.state = builderStateFlush
}
timer := prometheus.NewTimer(b.metrics.flushTime)
defer timer.ObserveDuration()
sum := sha256.Sum224(b.flushBuffer.Bytes())
sumStr := hex.EncodeToString(sum[:])
objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:])
if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil {
return err
}
b.Reset()
return nil
}
func (b *Builder) buildObject() error {
timer := prometheus.NewTimer(b.metrics.buildTime)
defer timer.ObserveDuration()
// We reset after a successful flush, but we also reset the buffer before
// building for safety.
b.flushBuffer.Reset()
if err := b.streams.EncodeTo(b.encoder); err != nil {
return fmt.Errorf("encoding streams: %w", err)
} else if err := b.logs.EncodeTo(b.encoder); err != nil {
return fmt.Errorf("encoding logs: %w", err)
} else if err := b.encoder.Flush(); err != nil {
return fmt.Errorf("encoding object: %w", err)
}
b.metrics.builtSize.Observe(float64(b.flushBuffer.Len()))
// We pass context.Background() below to avoid allowing building an object to
// time out; timing out on build would discard anything we built and would
// cause data loss.
dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len()))
return b.metrics.encoding.Observe(context.Background(), dec)
}
// Reset discards pending data and resets the builder to an empty state.
func (b *Builder) Reset() {
b.logs.Reset()
b.streams.Reset()
b.state = builderStateEmpty
b.flushBuffer.Reset()
b.metrics.sizeEstimate.Set(0)
}
// RegisterMetrics registers metrics about builder to report to reg. All
// metrics will have a tenant label set to the tenant ID of the Builder.
//
// If multiple Builders for the same tenant are running in the same process,
// reg must contain additional labels to differentiate between them.
func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)
return b.metrics.Register(reg)
}
// UnregisterMetrics unregisters metrics about builder from reg.
func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg)
b.metrics.Unregister(reg)
return md, nil
}

@ -33,7 +33,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] {
continue
}
for result := range iterSection(ctx, logsDec, section) {
for result := range IterSection(ctx, logsDec, section) {
if result.Err() != nil || !yield(result.MustValue()) {
return result.Err()
}
@ -44,7 +44,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] {
})
}
func iterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] {
func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] {
return result.Iter(func(yield func(Record) bool) error {
// We need to pull the columns twice: once from the dataset implementation
// and once for the metadata to retrieve column type.

@ -31,7 +31,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] {
continue
}
for result := range iterSection(ctx, streamsDec, section) {
for result := range IterSection(ctx, streamsDec, section) {
if result.Err() != nil || !yield(result.MustValue()) {
return result.Err()
}
@ -42,7 +42,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] {
})
}
func iterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] {
func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] {
return result.Iter(func(yield func(Stream) bool) error {
// We need to pull the columns twice: once from the dataset implementation
// and once for the metadata to retrieve column type.

@ -0,0 +1,294 @@
package dataobj
import (
"context"
"fmt"
"io"
"iter"
"sort"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
)
// Predicates for reading logs.
type (
// MetadataMatcher is a predicate for matching metadata in a logs section.
// MetadataMatcher predicates assert that a metadata entry named Name exists
// and its value is set to Value.
//
// For equality matches, MetadataMatcher should always be used;
// MetadataMatchers can translate into more efficient filter operations than
// a [MetadataFilter] can.
MetadataMatcher struct{ Name, Value string }
// MetadataFilter is a predicate for matching metadata in a logs section.
// MetadataFilter predicates return a true value when the combination of the
// provided metadata entry name and value should be included in the result.
//
// MetadataFilter predicates should be only used for more complex filtering;
// for equality matches, [MetadataMatcher]s are more efficient.
MetadataFilter func(name, value string) bool
)
// A Record is an individual log record in a data object.
type Record struct {
StreamID int64 // StreamID associated with the log record.
Timestamp time.Time // Timestamp of the log record.
Metadata labels.Labels // Set of metadata associated with the log record.
Line string // Line of the log record.
}
// LogsReader reads the set of logs from an [Object].
type LogsReader struct {
obj *Object
idx int
matchers map[string]string
filters map[string]MetadataFilter
matchIDs map[int64]struct{}
next func() (result.Result[logs.Record], bool)
stop func()
}
// NewLogsReader creates a new LogsReader that reads from the logs section of
// the given object.
func NewLogsReader(obj *Object, sectionIndex int) *LogsReader {
var lr LogsReader
lr.Reset(obj, sectionIndex)
return &lr
}
// MatchStreams provides a sequence of stream IDs for the logs reader to match.
// [LogsReader.Read] will only return logs for the provided stream IDs.
//
// MatchStreams may be called multiple times to match multiple sets of streams.
//
// MatchStreams may only be called before reading begins or after a call to
// [LogsReader.Reset].
func (r *LogsReader) MatchStreams(ids iter.Seq[int64]) error {
if r.next != nil {
return fmt.Errorf("cannot change matched streams after reading has started")
}
if r.matchIDs == nil {
r.matchIDs = make(map[int64]struct{})
}
for id := range ids {
r.matchIDs[id] = struct{}{}
}
return nil
}
// AddMetadataMatcher adds a metadata matcher to the LogsReader.
// [LogsReader.Read] will only return logs for which the metadata matcher
// predicate passes.
//
// AddMetadataMatcher may only be called before reading begins or after a call
// to [LogsReader.Reset].
func (r *LogsReader) AddMetadataMatcher(m MetadataMatcher) error {
if r.next != nil {
return fmt.Errorf("cannot add metadata matcher after reading has started")
}
if r.matchers == nil {
r.matchers = make(map[string]string)
}
r.matchers[m.Name] = m.Value
return nil
}
// AddMetadataFilter adds a metadata filter to the LogsReader.
// [LogsReader.Read] will only return records for which the metadata filter
// predicate passes. The filter f will be called with the provided key to allow
// the same function to be reused for multiple keys.
//
// AddMetadataFilter may only be called before reading begins or after a call
// to [LogsReader.Reset].
func (r *LogsReader) AddMetadataFilter(key string, f MetadataFilter) error {
if r.next != nil {
return fmt.Errorf("cannot add metadata filter after reading has started")
}
if r.filters == nil {
r.filters = make(map[string]MetadataFilter)
}
r.filters[key] = f
return nil
}
// Read reads up to the next len(s) records from the reader and stores them
// into s. It returns the number of records read and any error encountered. At
// the end of the logs section, Read returns 0, io.EOF.
func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
// TODO(rfratto): The implementation below is the initial, naive approach. It
// lacks a few features that will be needed at scale:
//
// * Read columns/pages in batches of len(s), rather than one row at a time,
//
// * Add page-level filtering based on min/max page values to quickly filter
// out batches of rows without needing to download or decode them.
//
// * Download pages in batches, rather than one at a time.
//
// * Only download/decode non-predicate columns following finding rows that
// match all predicate columns.
//
// * Reuse as much memory as possible from a combination of s and the state
// of LogsReader.
//
// These details can change internally without changing the API exposed by
// LogsReader, which is designed to permit efficient use in the future.
if r.obj == nil {
return 0, io.EOF
} else if r.idx < 0 {
return 0, fmt.Errorf("invalid section index %d", r.idx)
}
if r.next == nil {
err := r.initIter(ctx)
if err != nil {
return 0, err
}
}
for i := range s {
res, ok := r.nextMatching()
if !ok {
return i, io.EOF
}
record, err := res.Value()
if err != nil {
return i, fmt.Errorf("reading record: %w", err)
}
s[i] = Record{
StreamID: record.StreamID,
Timestamp: record.Timestamp,
Metadata: convertMetadata(record.Metadata),
Line: record.Line,
}
}
return len(s), nil
}
func (r *LogsReader) initIter(ctx context.Context) error {
sec, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}
if r.stop != nil {
r.stop()
}
seq := logs.IterSection(ctx, r.obj.dec.LogsDecoder(), sec)
r.next, r.stop = result.Pull(seq)
return nil
}
func (r *LogsReader) findSection(ctx context.Context) (*filemd.SectionInfo, error) {
si, err := r.obj.dec.Sections(ctx)
if err != nil {
return nil, fmt.Errorf("reading sections: %w", err)
}
var n int
for _, s := range si {
if s.Type == filemd.SECTION_TYPE_LOGS {
if n == r.idx {
return s, nil
}
n++
}
}
return nil, fmt.Errorf("section index %d not found", r.idx)
}
func (r *LogsReader) nextMatching() (result.Result[logs.Record], bool) {
if r.next == nil {
return result.Result[logs.Record]{}, false
}
NextRow:
res, ok := r.next()
if !ok {
return res, ok
}
record, err := res.Value()
if err != nil {
return res, true
}
if r.matchIDs != nil {
if _, ok := r.matchIDs[record.StreamID]; !ok {
goto NextRow
}
}
for key, value := range r.matchers {
if getMetadata(record.Metadata, key) != value {
goto NextRow
}
}
for key, filter := range r.filters {
if !filter(key, getMetadata(record.Metadata, key)) {
goto NextRow
}
}
return res, true
}
func getMetadata(md push.LabelsAdapter, key string) string {
for _, l := range md {
if l.Name == key {
return l.Value
}
}
return ""
}
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
for _, label := range md {
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
}
sort.Sort(l)
return l
}
// Reset resets the LogsReader with a new object and section index to read
// from. Reset allows reusing a LogsReader without allocating a new one.
//
// Reset may be called with a nil object and a negative section index to clear
// the LogsReader without needing a new object.
func (r *LogsReader) Reset(obj *Object, sectionIndex int) {
if r.stop != nil {
r.stop()
}
r.obj = obj
r.idx = sectionIndex
r.next = nil
r.stop = nil
clear(r.matchers)
clear(r.filters)
clear(r.matchIDs)
}

@ -0,0 +1,183 @@
package dataobj_test
import (
"bytes"
"context"
"errors"
"io"
"slices"
"strings"
"testing"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
)
var recordsTestdata = []logs.Record{
{StreamID: 1, Timestamp: unixTime(10), Metadata: nil, Line: "hello"},
{StreamID: 1, Timestamp: unixTime(15), Metadata: metadata("trace_id", "123"), Line: "world"},
{StreamID: 2, Timestamp: unixTime(5), Metadata: nil, Line: "hello again"},
{StreamID: 2, Timestamp: unixTime(20), Metadata: metadata("user", "12"), Line: "world again"},
{StreamID: 3, Timestamp: unixTime(25), Metadata: metadata("user", "14"), Line: "hello one more time"},
{StreamID: 3, Timestamp: unixTime(30), Metadata: metadata("trace_id", "123"), Line: "world one more time"},
}
func metadata(kvps ...string) push.LabelsAdapter {
if len(kvps)%2 != 0 {
panic("metadata: odd number of key-value pairs")
}
m := make(push.LabelsAdapter, len(kvps)/2)
for i := 0; i < len(kvps); i += 2 {
m = append(m, push.LabelAdapter{Name: kvps[i], Value: kvps[i+1]})
}
return m
}
func TestLogsReader(t *testing.T) {
expect := []dataobj.Record{
{1, unixTime(10), labels.FromStrings(), "hello"},
{1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"},
{2, unixTime(5), labels.FromStrings(), "hello again"},
{2, unixTime(20), labels.FromStrings("user", "12"), "world again"},
{3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"},
{3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"},
}
// Build with many pages but one section.
obj := buildLogsObject(t, logs.Options{
PageSizeHint: 1,
BufferSize: 1,
SectionSize: 1024,
})
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
actual, err := readAllRecords(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestLogsReader_MatchStreams(t *testing.T) {
expect := []dataobj.Record{
{1, unixTime(10), labels.FromStrings(), "hello"},
{1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"},
{3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"},
{3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"},
}
// Build with many pages but one section.
obj := buildLogsObject(t, logs.Options{
PageSizeHint: 1,
BufferSize: 1,
SectionSize: 1024,
})
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
require.NoError(t, r.MatchStreams(slices.Values([]int64{1, 3})))
actual, err := readAllRecords(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestLogsReader_AddMetadataMatcher(t *testing.T) {
expect := []dataobj.Record{
{1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"},
{3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"},
}
// Build with many pages but one section.
obj := buildLogsObject(t, logs.Options{
PageSizeHint: 1,
BufferSize: 1,
SectionSize: 1024,
})
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
require.NoError(t, r.AddMetadataMatcher(dataobj.MetadataMatcher{"trace_id", "123"}))
actual, err := readAllRecords(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestLogsReader_AddMetadataFilter(t *testing.T) {
expect := []dataobj.Record{
{2, unixTime(20), labels.FromStrings("user", "12"), "world again"},
{3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"},
}
// Build with many pages but one section.
obj := buildLogsObject(t, logs.Options{
PageSizeHint: 1,
BufferSize: 1,
SectionSize: 1024,
})
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
err = r.AddMetadataFilter("user", func(name, value string) bool {
require.Equal(t, "user", name)
return strings.HasPrefix(value, "1")
})
require.NoError(t, err)
actual, err := readAllRecords(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func buildLogsObject(t *testing.T, opts logs.Options) *dataobj.Object {
t.Helper()
s := logs.New(nil, opts)
for _, rec := range recordsTestdata {
s.Append(rec)
}
var buf bytes.Buffer
enc := encoding.NewEncoder(&buf)
require.NoError(t, s.EncodeTo(enc))
require.NoError(t, enc.Flush())
return dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
}
func readAllRecords(ctx context.Context, r *dataobj.LogsReader) ([]dataobj.Record, error) {
var (
res []dataobj.Record
buf = make([]dataobj.Record, 128)
)
for {
n, err := r.Read(ctx, buf)
if n > 0 {
res = append(res, buf[:n]...)
}
if errors.Is(err, io.EOF) {
return res, nil
} else if err != nil {
return res, err
}
buf = buf[:0]
}
}

@ -1,111 +0,0 @@
package dataobj
import (
"context"
"errors"
"fmt"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
"github.com/grafana/loki/v3/pkg/logproto"
)
// reader connects to an object storage bucket and supports basic reading from
// data objects.
//
// reader isn't exposed as a public API because it's insufficient for reading
// at scale; more work is needed to support efficient reads and filtering data.
// At the moment, reader is only used for tests.
type reader struct {
bucket objstore.Bucket
}
func newReader(bucket objstore.Bucket) *reader {
return &reader{bucket: bucket}
}
// Objects returns an iterator over all data objects for the provided tenant.
func (r *reader) Objects(ctx context.Context, tenant string) result.Seq[string] {
tenantPath := fmt.Sprintf("tenant-%s/objects/", tenant)
return result.Iter(func(yield func(string) bool) error {
errIterationStopped := errors.New("iteration stopped")
err := r.bucket.Iter(ctx, tenantPath, func(name string) error {
if !yield(name) {
return errIterationStopped
}
return nil
}, objstore.WithRecursiveIter())
switch {
case errors.Is(err, errIterationStopped):
return nil
default:
return err
}
})
}
// Streams returns an iterator over all [logproto.Stream] entries for the
// provided object. Each emitted stream contains all logs for that stream in
// ascending timestamp order. Streams are emitted in in the order they were
// first appended to the data object.
func (r *reader) Streams(ctx context.Context, object string) result.Seq[logproto.Stream] {
return result.Iter(func(yield func(logproto.Stream) bool) error {
dec := encoding.BucketDecoder(r.bucket, object)
streamRecords, err := result.Collect(streams.Iter(ctx, dec))
if err != nil {
return fmt.Errorf("reading streams dataset: %w", err)
}
streamRecordLookup := make(map[int64]streams.Stream, len(streamRecords))
for _, stream := range streamRecords {
streamRecordLookup[stream.ID] = stream
}
var (
lastID int64
batch logproto.Stream
)
for result := range logs.Iter(ctx, dec) {
record, err := result.Value()
if err != nil {
return fmt.Errorf("iterating over logs: %w", err)
}
if lastID != record.StreamID {
if lastID != 0 && !yield(batch) {
return nil
}
streamRecord := streamRecordLookup[record.StreamID]
batch = logproto.Stream{
Labels: streamRecord.Labels.String(),
Hash: streamRecord.Labels.Hash(),
}
lastID = record.StreamID
}
batch.Entries = append(batch.Entries, logproto.Entry{
Timestamp: record.Timestamp,
Line: record.Line,
StructuredMetadata: record.Metadata,
})
}
if len(batch.Entries) > 0 {
if !yield(batch) {
return nil
}
}
return nil
})
}

@ -0,0 +1,248 @@
package dataobj
import (
"context"
"fmt"
"io"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
)
// Predicates for reading streams.
type (
// LabelMatcher is a predicate for matching labels in a streams section.
// LabelMatcher predicates assert that a label named Name exists and its
// value is set to Value.
//
// For equality matches, LabelMatcher should always be used; LabelMatchers
// can translate into more efficient filter operations than a [LabelFilter]
// can.
LabelMatcher struct{ Name, Value string }
// LabelFilter is a predicate for matching labels in a streams section.
// LabelFilter predicates return a true value when the combination of the
// provided label name and value should be included in the result.
//
// LabelFilter predicates should be only used for more complex filtering; for
// equality matches, [LabelMatcher]s are more efficient.
LabelFilter func(name, value string) bool
)
// A Stream is an individual stream in a data object.
type Stream struct {
// ID of the stream. Stream IDs are unique across all sections in an object,
// but not across multiple objects.
ID int64
// MinTime and MaxTime denote the range of timestamps across all entries in
// the stream.
MinTime, MaxTime time.Time
// Labels of the stream.
Labels labels.Labels
}
// StreamsReader reads the set of streams from an [Object].
type StreamsReader struct {
obj *Object
idx int
matchers map[string]string
filters map[string]LabelFilter
next func() (result.Result[streams.Stream], bool)
stop func()
}
// NewStreamsReader creates a new StreamsReader that reads from the streams
// section of the given object.
func NewStreamsReader(obj *Object, sectionIndex int) *StreamsReader {
var sr StreamsReader
sr.Reset(obj, sectionIndex)
return &sr
}
// AddLabelMatcher adds a label matcher to the StreamsReader.
// [StreamsReader.Read] will only return streams for which the label matcher
// predicate passes.
//
// AddLabelMatcher may only be called before reading begins or after a call to
// [StreamsReader.Reset].
func (r *StreamsReader) AddLabelMatcher(m LabelMatcher) error {
if r.next != nil {
return fmt.Errorf("cannot add label matcher after reading has started")
}
if r.matchers == nil {
r.matchers = make(map[string]string)
}
r.matchers[m.Name] = m.Value
return nil
}
// AddLabelFilter adds a label filter to the StreamsReader.
// [StreamsReader.Read] will only return streams for which the label filter
// predicate passes. The filter f will be called with the provided key to allow
// the same function to be reused for multiple keys.
//
// AddLabelFilter may only be called before reading begins or after a call to
// [StreamsReader.Reset].
func (r *StreamsReader) AddLabelFilter(key string, f LabelFilter) error {
if r.next != nil {
return fmt.Errorf("cannot add label filter after reading has started")
}
if r.filters == nil {
r.filters = make(map[string]LabelFilter)
}
r.filters[key] = f
return nil
}
// Read reads up to the next len(s) streams from the reader and stores them
// into s. It returns the number of streams read and any error encountered. At
// the end of the stream section, Read returns 0, io.EOF.
func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) {
// TODO(rfratto): The implementation below is the initial, naive approach. It
// lacks a few features that will be needed at scale:
//
// * Read columns/pages in batches of len(s), rather than one row at a time,
//
// * Add page-level filtering based on min/max page values to quickly filter
// out batches of rows without needing to download or decode them.
//
// * Download pages in batches, rather than one at a time.
//
// * Only download/decode non-predicate columns following finding rows that
// match all predicate columns.
//
// * Reuse as much memory as possible from a combination of s and the state
// of StreamsReader.
//
// These details can change internally without changing the API exposed by
// StreamsReader, which is designed to permit efficient use in the future.
if r.obj == nil {
return 0, io.EOF
} else if r.idx < 0 {
return 0, fmt.Errorf("invalid section index %d", r.idx)
}
if r.next == nil {
err := r.initIter(ctx)
if err != nil {
return 0, err
}
}
for i := range s {
res, ok := r.nextMatching()
if !ok {
return i, io.EOF
}
stream, err := res.Value()
if err != nil {
return i, fmt.Errorf("reading stream: %w", err)
}
s[i] = Stream{
ID: stream.ID,
MinTime: stream.MinTimestamp,
MaxTime: stream.MaxTimestamp,
Labels: stream.Labels,
}
}
return len(s), nil
}
func (r *StreamsReader) initIter(ctx context.Context) error {
sec, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}
if r.stop != nil {
r.stop()
}
seq := streams.IterSection(ctx, r.obj.dec.StreamsDecoder(), sec)
r.next, r.stop = result.Pull(seq)
return nil
}
func (r *StreamsReader) findSection(ctx context.Context) (*filemd.SectionInfo, error) {
si, err := r.obj.dec.Sections(ctx)
if err != nil {
return nil, fmt.Errorf("reading sections: %w", err)
}
var n int
for _, s := range si {
if s.Type == filemd.SECTION_TYPE_STREAMS {
if n == r.idx {
return s, nil
}
n++
}
}
return nil, fmt.Errorf("section index %d not found", r.idx)
}
func (r *StreamsReader) nextMatching() (result.Result[streams.Stream], bool) {
if r.next == nil {
return result.Result[streams.Stream]{}, false
}
NextRow:
res, ok := r.next()
if !ok {
return res, ok
}
stream, err := res.Value()
if err != nil {
return res, true
}
for key, value := range r.matchers {
if stream.Labels.Get(key) != value {
goto NextRow
}
}
for key, filter := range r.filters {
if !filter(key, stream.Labels.Get(key)) {
goto NextRow
}
}
return res, true
}
// Reset resets the StreamsReader with a new object and section index to read
// from. Reset allows reusing a StreamsReader without allocating a new one.
//
// Reset may be called with a nil object and a negative section index to clear
// the StreamsReader without needing a new object.
func (r *StreamsReader) Reset(obj *Object, sectionIndex int) {
if r.stop != nil {
r.stop()
}
r.obj = obj
r.idx = sectionIndex
r.next = nil
r.stop = nil
clear(r.matchers)
clear(r.filters)
}

@ -0,0 +1,131 @@
package dataobj_test
import (
"bytes"
"context"
"errors"
"io"
"strings"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
)
var streamsTestdata = []struct {
Labels labels.Labels
Timestamp time.Time
}{
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(10)},
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(15)},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(5)},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(20)},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(25)},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(30)},
}
func TestStreamsReader(t *testing.T) {
expect := []dataobj.Stream{
{1, unixTime(10), unixTime(15), labels.FromStrings("cluster", "test", "app", "foo")},
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")},
}
obj := buildStreamsObject(t, 1) // Many pages
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.StreamsSections)
r := dataobj.NewStreamsReader(obj, 0)
actual, err := readAllStreams(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestStreamsReader_AddLabelMatcher(t *testing.T) {
expect := []dataobj.Stream{
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
}
obj := buildStreamsObject(t, 1) // Many pages
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.StreamsSections)
r := dataobj.NewStreamsReader(obj, 0)
require.NoError(t, r.AddLabelMatcher(dataobj.LabelMatcher{Name: "app", Value: "bar"}))
actual, err := readAllStreams(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestStreamsReader_AddLabelFilter(t *testing.T) {
expect := []dataobj.Stream{
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")},
}
obj := buildStreamsObject(t, 1) // Many pages
md, err := obj.Metadata(context.Background())
require.NoError(t, err)
require.Equal(t, 1, md.StreamsSections)
r := dataobj.NewStreamsReader(obj, 0)
err = r.AddLabelFilter("app", func(key string, value string) bool {
require.Equal(t, "app", key)
return strings.HasPrefix(value, "b")
})
require.NoError(t, err)
actual, err := readAllStreams(context.Background(), r)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func unixTime(sec int64) time.Time {
return time.Unix(sec, 0).UTC()
}
func buildStreamsObject(t *testing.T, pageSize int) *dataobj.Object {
t.Helper()
s := streams.New(nil, pageSize)
for _, d := range streamsTestdata {
s.Record(d.Labels, d.Timestamp)
}
var buf bytes.Buffer
enc := encoding.NewEncoder(&buf)
require.NoError(t, s.EncodeTo(enc))
require.NoError(t, enc.Flush())
return dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
}
func readAllStreams(ctx context.Context, r *dataobj.StreamsReader) ([]dataobj.Stream, error) {
var (
res []dataobj.Stream
buf = make([]dataobj.Stream, 128)
)
for {
n, err := r.Read(ctx, buf)
if n > 0 {
res = append(res, buf[:n]...)
}
if errors.Is(err, io.EOF) {
return res, nil
} else if err != nil {
return res, err
}
buf = buf[:0]
}
}
Loading…
Cancel
Save