chore(dataobj-consumer): Sort logs object-wide (#19231)

Sorting logs globally (object-wide per tenant) removes overlapping time ranges of sections in the objects.

Sections contain logs from multiple streams, and the ingest lag of streams may vary. This means that although logs of individual sections are sorted by timestamp, the overall sorting of logs is not guaranteed. Therefore sections are sorted with a k-way merge (SortMerge), which does over-query data in case the query would reach its result limit early.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/19254/head
Christian Haudum 7 months ago committed by GitHub
parent 37eddabac7
commit a3017f2b13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 92
      pkg/dataobj/consumer/logsobj/builder.go
  2. 14
      pkg/dataobj/consumer/logsobj/builder_metrics.go
  3. 90
      pkg/dataobj/consumer/logsobj/builder_test.go
  4. 90
      pkg/dataobj/consumer/logsobj/sort.go
  5. 23
      pkg/dataobj/consumer/partition_processor.go
  6. 4
      pkg/dataobj/consumer/partition_processor_test.go
  7. 22
      pkg/dataobj/dataobj.go
  8. 2
      pkg/dataobj/internal/dataset/page_reader.go
  9. 18
      pkg/dataobj/internal/result/result.go
  10. 63
      pkg/dataobj/sections/logs/builder.go
  11. 23
      pkg/dataobj/sections/logs/iter.go
  12. 2
      pkg/dataobj/sections/logs/iter_test.go
  13. 2
      pkg/dataobj/sections/logs/row_reader.go
  14. 59
      pkg/dataobj/sections/logs/table_merge.go
  15. 20
      pkg/dataobj/sections/streams/builder.go
  16. 100
      tools/dataobj-sort/main.go

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"github.com/facette/natsort"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
@ -378,6 +379,97 @@ func (b *Builder) Flush() (*dataobj.Object, io.Closer, error) {
return obj, closer, err
}
// CopyAndSort takes an existing [dataobj.Object] and rewrites the logs sections so the logs are sorted object-wide.
// The order of the sections is deterministic. For each tenant, first come the streams sections in the order of the old object
// and second come the new, rewritten logs sections. Tenants are sorted in natural order.
func (b *Builder) CopyAndSort(obj *dataobj.Object) (*dataobj.Object, io.Closer, error) {
dur := prometheus.NewTimer(b.metrics.sortDurationSeconds)
defer dur.ObserveDuration()
ctx := context.Background()
sb := streams.NewBuilder(b.metrics.streams, int(b.cfg.TargetPageSize), b.cfg.MaxPageRows)
lb := logs.NewBuilder(b.metrics.logs, logs.BuilderOptions{
PageSizeHint: int(b.cfg.TargetPageSize),
PageMaxRowCount: b.cfg.MaxPageRows,
BufferSize: int(b.cfg.BufferSize),
StripeMergeLimit: b.cfg.SectionStripeMergeLimit,
AppendStrategy: logs.AppendOrdered,
})
// Sort the set of tenants so the new object has a deterministic order of sections.
tenants := obj.Tenants()
natsort.Sort(tenants)
for _, tenant := range tenants {
for _, sec := range obj.Sections().Filter(func(s *dataobj.Section) bool { return streams.CheckSection(s) && s.Tenant == tenant }) {
sb.Reset()
sb.SetTenant(sec.Tenant)
// Copy section into new builder. This is *very* inefficient at the moment!
// TODO(chaudum): Create implementation of SectionBuilder interface that can copy entire ranges from a SectionReader.
section, err := streams.Open(ctx, sec)
if err != nil {
return nil, nil, fmt.Errorf("failed to open streams section: %w", err)
}
iter := streams.IterSection(ctx, section)
for res := range iter {
val, err := res.Value()
if err != nil {
return nil, nil, err
}
sb.AppendValue(val)
}
if err := b.builder.Append(sb); err != nil {
return nil, nil, err
}
}
var sections []*dataobj.Section
for _, sec := range obj.Sections().Filter(func(s *dataobj.Section) bool { return logs.CheckSection(s) && s.Tenant == tenant }) {
sections = append(sections, sec)
}
if len(sections) == 0 {
return nil, nil, fmt.Errorf("no logs sections found for tenant: %v", tenant)
}
// TODO(chaudum): Handle special case len(sections) == 1
lb.Reset()
lb.SetTenant(tenant)
iter, err := sortMergeIterator(ctx, sections)
if err != nil {
return nil, nil, fmt.Errorf("creating sort iterator: %w", err)
}
for rec := range iter {
val, err := rec.Value()
if err != nil {
return nil, nil, err
}
lb.Append(val)
// If our logs section has gotten big enough, we want to flush it to the encoder and start a new section.
if lb.UncompressedSize() > int(b.cfg.TargetSectionSize) {
if err := b.builder.Append(lb); err != nil {
return nil, nil, err
}
lb.Reset()
lb.SetTenant(tenant)
}
}
// Append the final section with the remaining logs
if err := b.builder.Append(lb); err != nil {
return nil, nil, err
}
}
return b.builder.Flush()
}
func (b *Builder) observeObject(ctx context.Context, obj *dataobj.Object) error {
var errs []error

@ -25,6 +25,8 @@ type builderMetrics struct {
sizeEstimate prometheus.Gauge
builtSize prometheus.Histogram
sortDurationSeconds prometheus.Histogram
}
// newBuilderMetrics creates a new set of [builderMetrics] for instrumenting
@ -103,6 +105,18 @@ func newBuilderMetrics() *builderMetrics {
Help: "Total number of flush failures.",
}),
sortDurationSeconds: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "dataobj",
Name: "sort_duration_seconds",
Help: "Time taken sorting logs object-wide after flushing.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
}
}

@ -3,6 +3,8 @@ package logsobj
import (
"context"
"errors"
"fmt"
"math"
"strings"
"testing"
"time"
@ -11,6 +13,8 @@ import (
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/logproto"
@ -19,7 +23,7 @@ import (
var testBuilderConfig = BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 20, // 1 MiB
TargetSectionSize: 1 << 19, // 512 KiB
TargetSectionSize: 8 << 10, // 8 KiB
BufferSize: 2048 * 8,
@ -128,3 +132,87 @@ func TestBuilder_Append(t *testing.T) {
require.Equal(t, tenant, section.Tenant)
}
}
func TestBuilder_CopyAndSort(t *testing.T) {
builder, _ := NewBuilder(testBuilderConfig, nil)
now := time.Date(2025, time.September, 17, 0, 0, 0, 0, time.UTC)
numRows := 16 // 16 rows with 1KiB each line and 8KiB section size ~> 2 logs sections per tenant
for _, tenant := range []string{"tenant-a", "tenant-b", "tenant-c"} {
for i := range numRows {
err := builder.Append(tenant, logproto.Stream{
Labels: `{cluster="test",app="foo"}`,
Entries: []push.Entry{{
Timestamp: now.Add(time.Duration(i%8) * time.Second),
Line: strings.Repeat("a", 1024), // 1KiB log line
}},
})
require.NoError(t, err)
}
}
obj1, closer1, err := builder.Flush()
require.NoError(t, err)
defer closer1.Close()
newBuilder, _ := NewBuilder(testBuilderConfig, nil)
obj2, closer2, err := newBuilder.CopyAndSort(obj1)
require.NoError(t, err)
defer closer2.Close()
for i, obj := range []*dataobj.Object{obj1, obj2} {
t.Log(" === dataobj", i)
t.Log("Size: ", obj.Size())
t.Log("Tenants:", obj.Tenants())
for i, section := range obj.Sections() {
t.Log("Section:", i, section.Tenant, section.Type.String())
}
}
require.Equal(
t,
obj1.Sections().Count(streams.CheckSection),
obj2.Sections().Count(streams.CheckSection),
"objects have different amount of streams sections",
)
require.Equal(
t,
obj1.Sections().Count(logs.CheckSection),
obj2.Sections().Count(logs.CheckSection),
"objects have different amount of logs sections",
)
// Assert DESC timestamp ordering across sections of a tenant
for _, tenant := range []string{"tenant-a", "tenant-b", "tenant-c"} {
prevTs := time.Unix(0, math.MaxInt64)
for _, sec := range obj2.Sections().Filter(func(s *dataobj.Section) bool {
return logs.CheckSection(s) && s.Tenant == tenant
}) {
for res := range iterLogsSection(t, sec) {
val, _ := res.Value()
require.LessOrEqual(t, val.Timestamp, prevTs)
prevTs = val.Timestamp
}
}
}
}
func iterLogsSection(t *testing.T, section *dataobj.Section) result.Seq[logs.Record] {
t.Helper()
ctx := t.Context()
return result.Iter(func(yield func(logs.Record) bool) error {
logsSection, err := logs.Open(ctx, section)
if err != nil {
return fmt.Errorf("opening section: %w", err)
}
for result := range logs.IterSection(ctx, logsSection) {
if result.Err() != nil || !yield(result.MustValue()) {
return result.Err()
}
}
return nil
})
}

@ -0,0 +1,90 @@
package logsobj
import (
"context"
"fmt"
"math"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/util/loser"
)
// sortMergeIterator returns an iterator that performs a k-way merge of records from multiple logs sections.
// It requires that the input sections are sorted sorted by the same order.
func sortMergeIterator(ctx context.Context, sections []*dataobj.Section) (result.Seq[logs.Record], error) {
sequences := make([]*sectionSequence, 0, len(sections))
for _, s := range sections {
sec, err := logs.Open(ctx, s)
if err != nil {
return nil, fmt.Errorf("failed to open logs section: %w", err)
}
ds, err := logs.MakeColumnarDataset(sec)
if err != nil {
return nil, fmt.Errorf("creating columnar dataset: %w", err)
}
columns, err := result.Collect(ds.ListColumns(ctx))
if err != nil {
return nil, err
}
r := dataset.NewReader(dataset.ReaderOptions{
Dataset: ds,
Columns: columns,
Prefetch: true,
})
sequences = append(sequences, &sectionSequence{
section: sec,
DatasetSequence: logs.NewDatasetSequence(r, 8<<10),
})
}
maxValue := result.Value(dataset.Row{
Index: math.MaxInt,
Values: []dataset.Value{
dataset.Int64Value(math.MaxInt64), // StreamID
dataset.Int64Value(math.MinInt64), // Timestamp
},
})
tree := loser.New(sequences, maxValue, sectionSequenceAt, rowResultLess, sectionSequenceClose)
return result.Iter(
func(yield func(logs.Record) bool) error {
defer tree.Close()
for tree.Next() {
seq := tree.Winner()
row, err := sectionSequenceAt(seq).Value()
if err != nil {
return err
}
var record logs.Record
err = logs.DecodeRow(seq.section.Columns(), row, &record, nil)
if err != nil || !yield(record) {
return err
}
}
return nil
}), nil
}
type sectionSequence struct {
logs.DatasetSequence
section *logs.Section
}
var _ loser.Sequence = (*sectionSequence)(nil)
func sectionSequenceAt(seq *sectionSequence) result.Result[dataset.Row] { return seq.At() }
func sectionSequenceClose(seq *sectionSequence) { seq.Close() }
func rowResultLess(a, b result.Result[dataset.Row]) bool {
return result.Compare(a, b, logs.CompareRows) < 0
}

@ -315,6 +315,12 @@ func (p *partitionProcessor) flush() error {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}
obj, closer, err = p.sort(obj, closer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to sort dataobj", "err", err)
return err
}
defer closer.Close()
objectPath, err := p.uploader.Upload(p.ctx, obj)
@ -334,6 +340,23 @@ func (p *partitionProcessor) flush() error {
return nil
}
func (p *partitionProcessor) sort(obj *dataobj.Object, closer io.Closer) (*dataobj.Object, io.Closer, error) {
defer closer.Close()
start := time.Now()
defer func() {
level.Debug(p.logger).Log("msg", "partition processor sorted logs object-wide", "duration", time.Since(start))
}()
// Create a new object builder but do not register metrics!
builder, err := logsobj.NewBuilder(p.builderCfg, p.scratchStore)
if err != nil {
return nil, nil, err
}
return builder.CopyAndSort(obj)
}
// commits the offset of the last record processed. It should be called after
// each successful flush to avoid duplicate data in consecutive data objects.
func (p *partitionProcessor) commit() error {

@ -23,6 +23,7 @@ import (
var testBuilderConfig = logsobj.BuilderConfig{
TargetPageSize: 2048,
MaxPageRows: 10,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
@ -297,7 +298,8 @@ func TestPartitionProcessor_ProcessRecord(t *testing.T) {
require.Equal(t, clock.Now(), p.lastModified)
}
func newTestPartitionProcessor(_ *testing.T, clock quartz.Clock) *partitionProcessor {
func newTestPartitionProcessor(t *testing.T, clock quartz.Clock) *partitionProcessor {
t.Helper()
p := newPartitionProcessor(
context.Background(),
&kgo.Client{},

@ -91,6 +91,7 @@ type Object struct {
metadata *filemd.Metadata
sections []*Section
tenants []string
}
// FromBucket opens an Object from the given storage bucket and path.
@ -130,22 +131,31 @@ func (o *Object) init(ctx context.Context) error {
return fmt.Errorf("reading metadata: %w", err)
}
readSections := make([]*Section, 0, len(metadata.Sections))
sections := make([]*Section, 0, len(metadata.Sections))
tenants := make(map[string]struct{})
for i, sec := range metadata.Sections {
typ, err := getSectionType(metadata, sec)
if err != nil {
return fmt.Errorf("getting section %d type: %w", i, err)
}
readSections = append(readSections, &Section{
tenant := metadata.Dictionary[sec.TenantRef]
sections = append(sections, &Section{
Type: typ,
Reader: o.dec.SectionReader(metadata, sec, sec.ExtensionData),
Tenant: metadata.Dictionary[sec.TenantRef],
Tenant: tenant,
})
tenants[tenant] = struct{}{}
}
o.metadata = metadata
o.sections = readSections
o.sections = sections
o.tenants = make([]string, 0, len(tenants))
for tenant := range tenants {
o.tenants = append(o.tenants, tenant)
}
return nil
}
@ -156,6 +166,10 @@ func (o *Object) Size() int64 { return o.size }
// returned sections must not be mutated.
func (o *Object) Sections() Sections { return o.sections }
// Tenant returns the list of tenant that have sections in the Object. The slice of
// returned tenants must not be mutated.
func (o *Object) Tenants() []string { return o.tenants }
// Reader returns a reader for the entire raw data object.
func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) {
return o.rr.Read(ctx)

@ -124,7 +124,7 @@ func (pr *pageReader) read(v []Value) (n int, err error) {
if err != nil {
return n, err
} else if valuesCount != presentCount {
return n, fmt.Errorf("unexpected number of values: %d", valuesCount)
return n, fmt.Errorf("unexpected number of values: %d, expected: %d", valuesCount, presentCount)
}
}

@ -99,3 +99,21 @@ func Collect[V any](seq Seq[V]) ([]V, error) {
}
return vals, errors.Join(errs...)
}
type valueCompareFunc[T any] func(T, T) int
func Compare[T any](a, b Result[T], cmp valueCompareFunc[T]) int {
var (
aVal, aErr = a.Value()
bVal, bErr = b.Value()
)
// Put errors first so we return errors early.
if aErr != nil {
return -1
} else if bErr != nil {
return 1
}
return cmp(aVal, bVal)
}

@ -25,6 +25,13 @@ type Record struct {
Line []byte
}
type AppendStrategy int
const (
AppendUnordered = iota
AppendOrdered
)
// BuilderOptions configures the behavior of the logs section.
type BuilderOptions struct {
// PageSizeHint is the size of pages to use when encoding the logs section.
@ -43,6 +50,11 @@ type BuilderOptions struct {
// increase time spent merging. Higher values of StripeMergeLimit increase
// memory overhead but reduce time spent merging.
StripeMergeLimit int
// AppendStrategy is allowed to control how the builder creates the section.
// When appending logs to the section in strict sort order, the [AppendOrdered] can be used to avoid
// creating and sorting of stripes.
AppendStrategy AppendStrategy
}
// Builder accumulate a set of [Record]s within a data object.
@ -106,15 +118,21 @@ func (b *Builder) Type() dataobj.SectionType { return sectionType }
// Append adds a new entry to b.
func (b *Builder) Append(entry Record) {
b.metrics.appendsTotal.Inc()
b.metrics.recordCount.Inc()
b.records = append(b.records, entry)
b.recordsSize += recordSize(entry)
if b.recordsSize >= b.opts.BufferSize {
b.flushRecords()
// Shortcut for when logs are appending in strict sort order.
// We skip building temporarily compressed stripes in favour of a speed
// with a single pass compression of all records.
if b.opts.AppendStrategy == AppendOrdered {
return
}
b.metrics.recordCount.Inc()
if b.recordsSize >= b.opts.BufferSize {
b.flushRecords(zstd.SpeedFastest)
}
}
func recordSize(record Record) int {
@ -130,16 +148,23 @@ func recordSize(record Record) int {
return size
}
func (b *Builder) flushRecords() {
func (b *Builder) flushRecords(encLevel zstd.EncoderLevel) {
if len(b.records) == 0 {
return
}
// We can panic in case flushRecords is called multiple times before flushing a section
// when using the [AppendOrdered] strategy, because that should not happen and is
// considered a programming error.
if b.opts.AppendStrategy == AppendOrdered && len(b.stripes) > 0 {
panic("must not call flushRecords multiple times for a single section when using AppendOrdered strategy")
}
// Our stripes are intermediate tables that don't need to have the best
// compression. To maintain high throughput on appends, we use the fastest
// compression for a stripe. Better compression is then used for sections.
compressionOpts := dataset.CompressionOptions{
Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedFastest)},
Zstd: []zstd.EOption{zstd.WithEncoderLevel(encLevel)},
}
stripe := buildTable(&b.stripeBuffer, b.opts.PageSizeHint, b.opts.PageMaxRowCount, compressionOpts, b.records)
@ -172,6 +197,20 @@ func (b *Builder) flushSection() *table {
return section
}
func (b *Builder) flushSectionOrdered() *table {
b.flushRecords(zstd.SpeedDefault)
if len(b.stripes) == 0 {
return nil
}
section := b.stripes[0]
b.stripes = sliceclear.Clear(b.stripes)
b.stripesCompressedSize = 0
b.stripesUncompressedSize = 0
return section
}
// UncompressedSize returns the current uncompressed size of the logs section
// in bytes.
func (b *Builder) UncompressedSize() int {
@ -200,10 +239,16 @@ func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
defer timer.ObserveDuration()
// Flush any remaining buffered data.
b.flushRecords()
var section *table
if b.opts.AppendStrategy == AppendOrdered {
// Flush buffered data all at once
section = b.flushSectionOrdered()
} else {
// Flush any remaining buffered data.
b.flushRecords(zstd.SpeedFastest)
section = b.flushSection()
}
section := b.flushSection()
if section == nil {
return 0, nil
}
@ -224,7 +269,7 @@ func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
// The first two columns of each row are *always* stream ID and timestamp.
//
// TODO(ashwanth): Find a safer way to do this. Same as [compareRows]
// TODO(ashwanth): Find a safer way to do this. Same as [CompareRows]
logsEnc.SetSortInfo(&datasetmd_v2.SortInfo{
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{
{

@ -41,10 +41,9 @@ func Iter(ctx context.Context, obj *dataobj.Object) result.Seq[Record] {
func IterSection(ctx context.Context, section *Section) result.Seq[Record] {
return result.Iter(func(yield func(Record) bool) error {
columnarSection := section.inner
dset, err := columnar.MakeDataset(columnarSection, columnarSection.Columns())
dset, err := MakeColumnarDataset(section)
if err != nil {
return fmt.Errorf("creating columns dataset: %w", err)
return fmt.Errorf("creating columnar dataset: %w", err)
}
columns, err := result.Collect(dset.ListColumns(ctx))
@ -70,7 +69,7 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Record] {
}
for _, row := range rows[:n] {
err := decodeRow(section.Columns(), row, &record, nil)
err := DecodeRow(section.Columns(), row, &record, nil)
if err != nil || !yield(record) {
return err
}
@ -79,13 +78,23 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Record] {
})
}
// decodeRow decodes a record from a [dataset.Row], using the provided columns
// ColumnarDataset is the exported type alias of the internal [columnar.Dataset].
type ColumnarDataset = columnar.Dataset
// MakeColumnarDataset returns the dataset from a section and a set of columns.
// It returns an error if not all columns are from the provided section.
func MakeColumnarDataset(section *Section) (*ColumnarDataset, error) {
columnarSection := section.inner
return columnar.MakeDataset(columnarSection, columnarSection.Columns())
}
// DecodeRow decodes a record from a [dataset.Row], using the provided columns
// to determine the column type. The list of columns must match the columns
// used to create the row.
//
// The sym argument is used for reusing metadata strings between calls to
// decodeRow. If sym is nil, metadata strings are always allocated.
func decodeRow(columns []*Column, row dataset.Row, record *Record, sym *symbolizer.Symbolizer) error {
// DecodeRow. If sym is nil, metadata strings are always allocated.
func DecodeRow(columns []*Column, row dataset.Row, record *Record, sym *symbolizer.Symbolizer) error {
labelBuilder := labelpool.Get()
defer labelpool.Put(labelBuilder)

@ -119,7 +119,7 @@ func TestDecode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
record := Record{}
err := decodeRow(tt.columns, tt.row, &record, nil)
err := DecodeRow(tt.columns, tt.row, &record, nil)
if tt.wantErr {
require.Error(t, err)
return

@ -101,7 +101,7 @@ func (r *RowReader) Read(ctx context.Context, s []Record) (int, error) {
}
for i := range r.buf[:n] {
err := decodeRow(r.sec.Columns(), r.buf[i], &s[i], r.symbols)
err := DecodeRow(r.sec.Columns(), r.buf[i], &s[i], r.symbols)
if err != nil {
return i, fmt.Errorf("decoding record: %w", err)
}

@ -78,10 +78,8 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
})
tableSequences = append(tableSequences, &tableSequence{
columns: dsetColumns,
r: r,
buf: make([]dataset.Row, 128), // Read 128 values at a time.
columns: dsetColumns,
DatasetSequence: NewDatasetSequence(r, 128),
})
}
@ -95,13 +93,13 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
var rows int
tree := loser.New(tableSequences, maxValue, tableSequenceValue, rowResultLess, tableSequenceStop)
tree := loser.New(tableSequences, maxValue, tableSequenceAt, rowResultLess, tableSequenceClose)
defer tree.Close()
for tree.Next() {
seq := tree.Winner()
row, err := tableSequenceValue(seq).Value()
row, err := tableSequenceAt(seq).Value()
if err != nil {
return nil, err
}
@ -136,9 +134,24 @@ func mergeTables(buf *tableBuffer, pageSize, pageRowCount int, compressionOpts d
}
type tableSequence struct {
curValue result.Result[dataset.Row]
DatasetSequence
columns []dataset.Column
}
var _ loser.Sequence = (*tableSequence)(nil)
func tableSequenceAt(seq *tableSequence) result.Result[dataset.Row] { return seq.At() }
func tableSequenceClose(seq *tableSequence) { seq.Close() }
func NewDatasetSequence(r *dataset.Reader, bufferSize int) DatasetSequence {
return DatasetSequence{
r: r,
buf: make([]dataset.Row, bufferSize),
}
}
type DatasetSequence struct {
curValue result.Result[dataset.Row]
r *dataset.Reader
@ -147,9 +160,7 @@ type tableSequence struct {
size int // Number of valid values in buf
}
var _ loser.Sequence = (*tableSequence)(nil)
func (seq *tableSequence) Next() bool {
func (seq *DatasetSequence) Next() bool {
if seq.off < seq.size {
seq.curValue = result.Value(seq.buf[seq.off])
seq.off++
@ -175,31 +186,23 @@ ReadBatch:
return true
}
func tableSequenceValue(seq *tableSequence) result.Result[dataset.Row] { return seq.curValue }
func (seq *DatasetSequence) At() result.Result[dataset.Row] {
return seq.curValue
}
func tableSequenceStop(seq *tableSequence) { _ = seq.r.Close() }
func (seq *DatasetSequence) Close() {
_ = seq.r.Close()
}
func rowResultLess(a, b result.Result[dataset.Row]) bool {
var (
aRow, aErr = a.Value()
bRow, bErr = b.Value()
)
// Put errors first so we return errors early.
if aErr != nil {
return true
} else if bErr != nil {
return false
}
return compareRows(aRow, bRow) < 0
return result.Compare(a, b, CompareRows) < 0
}
// compareRows compares two rows by their first two columns. compareRows panics
// CompareRows compares two rows by their first two columns. CompareRows panics
// if a or b doesn't have at least two columns, if the first column isn't a
// int64-encoded stream ID, or if the second column isn't an int64-encoded
// timestamp.
func compareRows(a, b dataset.Row) int {
func CompareRows(a, b dataset.Row) int {
// The first two columns of each row are *always* stream ID and timestamp.
//
// TODO(rfratto): Can we find a safer way of doing this?

@ -145,6 +145,26 @@ func (b *Builder) observeRecord(ts time.Time) {
}
}
// AppendValue may only be used for copying streams from an existing section.
func (b *Builder) AppendValue(val Stream) {
newStream := streamPool.Get().(*Stream)
newStream.Reset()
newStream.ID = val.ID
newStream.MinTimestamp, newStream.MaxTimestamp = val.MinTimestamp, val.MaxTimestamp
newStream.UncompressedSize = val.UncompressedSize
newStream.Labels = val.Labels
newStream.Rows = val.Rows
newStream.Labels.Range(func(l labels.Label) {
b.currentLabelsSize += len(l.Value)
})
hash := labels.StableHash(newStream.Labels)
b.lookup[hash] = append(b.lookup[hash], newStream)
b.ordered = append(b.ordered, newStream)
}
// EstimatedSize returns the estimated size of the Streams section in bytes.
func (b *Builder) EstimatedSize() int {
// Since columns are only built when encoding, we can't use

@ -0,0 +1,100 @@
package main
import (
"context"
"io"
"log"
"os"
"time"
gokitlog "github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/scratch"
)
func main() {
args := os.Args[1:]
if len(args) < 1 {
log.Fatal("requires at least 1 argument: dataobj")
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
fp, err := os.Open(args[0])
if err != nil {
log.Fatal(err)
}
defer fp.Close()
fi, err := fp.Stat()
if err != nil {
log.Fatal(err)
}
orig, err := dataobj.FromReaderAt(fp, fi.Size())
if err != nil {
log.Fatal(err)
}
cfg := logsobj.BuilderConfig{
TargetPageSize: 64 << 10,
MaxPageRows: 1000,
TargetObjectSize: 512 << 20,
TargetSectionSize: 512 << 20,
BufferSize: 16 << 20,
SectionStripeMergeLimit: 8,
}
scr, err := scratch.NewFilesystem(gokitlog.NewNopLogger(), os.TempDir())
if err != nil {
log.Fatal(err)
}
b, err := logsobj.NewBuilder(cfg, scr)
if err != nil {
log.Fatal(err)
}
start := time.Now()
sortedObj, closer, err := b.CopyAndSort(orig)
duration := time.Since(start)
if err != nil {
log.Fatal(err)
}
defer closer.Close()
log.Printf("Took %s\n", duration)
log.Println("== ORIIGNAL DATAOBJ")
for _, s := range sortedObj.Sections() {
log.Println(" ", s.Type.String(), s.Tenant)
}
log.Println("== SORTED DATAOBJ")
for _, s := range sortedObj.Sections() {
log.Println(" ", s.Type.String(), s.Tenant)
}
fw, err := os.CreateTemp("", fi.Name()+"-sorted")
if err != nil {
log.Fatal(err)
}
defer fw.Close()
reader, err := sortedObj.Reader(ctx)
if err != nil {
log.Fatal(err)
}
defer reader.Close()
start = time.Now()
// Copy the sorted data from reader to the output file
written, err := io.Copy(fw, reader)
duration = time.Since(start)
if err != nil {
log.Fatal(err)
}
log.Printf("Written %d bytes to %s in %s\n", written, fw.Name(), duration)
}
Loading…
Cancel
Save