chore(engine): Sort entries in dataobj sections by timestamp descending (#18367)

Sorting sections by `timestamp DESC` first and `streamID ASC` second allows for more efficient reading (by section) for BACKWARD log queries.

- This change breaks FORWARD log queries. Which is ok for the time being. We will add support later again when columnar reading of the dataobj is introduced.
- This change renders existing dataobj in storage incompatible.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/18423/head
Christian Haudum 10 months ago committed by GitHub
parent fa20a3f7e5
commit d9f7d785ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 48
      pkg/dataobj/querier/store_test.go
  2. 28
      pkg/dataobj/sections/logs/builder_test.go
  3. 8
      pkg/dataobj/sections/logs/reader_test.go
  4. 4
      pkg/dataobj/sections/logs/table_build.go
  5. 8
      pkg/dataobj/sections/logs/table_merge.go
  6. 13
      pkg/dataobj/sections/logs/table_test.go
  7. 4
      pkg/engine/engine.go
  8. 62
      pkg/engine/executor/dataobjscan.go
  9. 42
      pkg/engine/executor/dataobjscan_test.go
  10. 2
      pkg/engine/executor/executor.go
  11. 64
      pkg/engine/executor/sortmerge.go
  12. 3
      pkg/engine/planner/logical/planner.go
  13. 6
      pkg/engine/planner/logical/planner_test.go
  14. 18
      pkg/engine/planner/physical/planner.go
  15. 29
      pkg/engine/planner/physical/planner_test.go
  16. 4
      pkg/logql/bench/bench_test.go
  17. 1
      pkg/logql/bench/store_dataobj_v2_engine.go
  18. 2
      pkg/querier/http_test.go
  19. 3
      pkg/querier/querier_test.go

@ -134,14 +134,10 @@ func TestStore_SelectSamples(t *testing.T) {
end: now.Add(time.Hour),
shards: []string{"0_of_2"},
want: []sampleWithLabels{
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
@ -160,10 +156,14 @@ func TestStore_SelectSamples(t *testing.T) {
end: now.Add(time.Hour),
shards: []string{"1_of_2"},
want: []sampleWithLabels{
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
},
},
{
@ -315,14 +315,10 @@ func TestStore_SelectLogs(t *testing.T) {
limit: 100,
direction: logproto.FORWARD,
want: []entryWithLabels{
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}},
@ -343,10 +339,14 @@ func TestStore_SelectLogs(t *testing.T) {
limit: 100,
direction: logproto.FORWARD,
want: []entryWithLabels{
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}},
},
},
{

@ -15,6 +15,12 @@ import (
func Test(t *testing.T) {
records := []logs.Record{
{
StreamID: 2,
Timestamp: time.Unix(10, 0),
Metadata: []labels.Label{{Name: "cluster", Value: "test"}, {Name: "app", Value: "foo"}},
Line: []byte("foo bar"),
},
{
StreamID: 1,
Timestamp: time.Unix(10, 0),
@ -27,12 +33,6 @@ func Test(t *testing.T) {
Metadata: []labels.Label{{Name: "cluster", Value: "test"}, {Name: "app", Value: "bar"}},
Line: []byte("goodbye world"),
},
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: []labels.Label{{Name: "cluster", Value: "test"}, {Name: "app", Value: "foo"}},
Line: []byte("foo bar"),
},
}
opts := logs.BuilderOptions{
@ -49,14 +49,14 @@ func Test(t *testing.T) {
buf, err := buildObject(tracker)
require.NoError(t, err)
// The order of records should be sorted by stream ID then timestamp, and all
// The order of records should be sorted by timestamp DESC then stream ID, and all
// metadata should be sorted by key then value.
expect := []logs.Record{
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: []labels.Label{{Name: "app", Value: "foo"}, {Name: "cluster", Value: "test"}},
Line: []byte("foo bar"),
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: []labels.Label{{Name: "app", Value: "bar"}, {Name: "cluster", Value: "test"}},
Line: []byte("goodbye world"),
},
{
StreamID: 1,
@ -66,9 +66,9 @@ func Test(t *testing.T) {
},
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: []labels.Label{{Name: "app", Value: "bar"}, {Name: "cluster", Value: "test"}},
Line: []byte("goodbye world"),
Timestamp: time.Unix(10, 0),
Metadata: []labels.Label{{Name: "app", Value: "foo"}, {Name: "cluster", Value: "test"}},
Line: []byte("foo bar"),
},
}

@ -26,10 +26,10 @@ func TestReader(t *testing.T) {
defer alloc.AssertSize(t, 0)
sec := buildSection(t, []logs.Record{
{StreamID: 1, Timestamp: unixTime(10), Metadata: nil, Line: []byte("hello, world!")},
{StreamID: 1, Timestamp: unixTime(20), Metadata: labels.FromStrings("trace_id", "abcdef"), Line: []byte("goodbye, world!")},
{StreamID: 2, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123456"), Line: []byte("foo bar")},
{StreamID: 2, Timestamp: unixTime(40), Metadata: labels.FromStrings("trace_id", "789012"), Line: []byte("baz qux")},
{StreamID: 2, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123456"), Line: []byte("foo bar")},
{StreamID: 1, Timestamp: unixTime(20), Metadata: labels.FromStrings("trace_id", "abcdef"), Line: []byte("goodbye, world!")},
{StreamID: 1, Timestamp: unixTime(10), Metadata: nil, Line: []byte("hello, world!")},
})
var (
@ -71,8 +71,8 @@ func TestReader(t *testing.T) {
})
expect := arrowtest.Rows{
{"stream_id.int64": int64(1), "trace_id.metadata.binary": []byte("abcdef"), "message.binary": []byte("goodbye, world!")},
{"stream_id.int64": int64(2), "trace_id.metadata.binary": []byte("123456"), "message.binary": []byte("foo bar")},
{"stream_id.int64": int64(1), "trace_id.metadata.binary": []byte("abcdef"), "message.binary": []byte("goodbye, world!")},
}
actualTable, err := readTable(context.Background(), r)

@ -48,9 +48,9 @@ func buildTable(buf *tableBuffer, pageSize int, compressionOpts dataset.Compress
// sortRecords sorts the set of records by stream ID and timestamp.
func sortRecords(records []Record) {
slices.SortFunc(records, func(a, b Record) int {
if res := cmp.Compare(a.StreamID, b.StreamID); res != 0 {
if res := b.Timestamp.Compare(a.Timestamp); res != 0 {
return res
}
return a.Timestamp.Compare(b.Timestamp)
return cmp.Compare(a.StreamID, b.StreamID)
})
}

@ -86,8 +86,8 @@ func mergeTables(buf *tableBuffer, pageSize int, compressionOpts dataset.Compres
maxValue := result.Value(dataset.Row{
Index: math.MaxInt,
Values: []dataset.Value{
dataset.Int64Value(math.MaxInt64),
dataset.Int64Value(math.MaxInt64),
dataset.Int64Value(math.MaxInt64), // StreamID
dataset.Int64Value(math.MinInt64), // Timestamp
},
})
@ -209,8 +209,8 @@ func compareRows(a, b dataset.Row) int {
bTimestamp = b.Values[1].Int64()
)
if res := cmp.Compare(aStreamID, bStreamID); res != 0 {
if res := cmp.Compare(bTimestamp, aTimestamp); res != 0 {
return res
}
return cmp.Compare(aTimestamp, bTimestamp)
return cmp.Compare(aStreamID, bStreamID)
}

@ -44,21 +44,22 @@ func initBuffer(buf *tableBuffer) {
func Test_mergeTables(t *testing.T) {
var buf tableBuffer
// tables need to be sorted by Timestamp DESC and StreamID ASC
var (
tableA = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 1, Timestamp: time.Unix(1, 0), Line: []byte("hello")},
{StreamID: 2, Timestamp: time.Unix(2, 0), Line: []byte("are")},
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: []byte("goodbye")},
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: []byte("hello")},
{StreamID: 2, Timestamp: time.Unix(2, 0), Line: []byte("how")},
{StreamID: 1, Timestamp: time.Unix(1, 0), Line: []byte("you")},
})
tableB = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 1, Timestamp: time.Unix(2, 0), Line: []byte("world")},
{StreamID: 3, Timestamp: time.Unix(1, 0), Line: []byte("you")},
{StreamID: 3, Timestamp: time.Unix(1, 0), Line: []byte("goodbye")},
})
tableC = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 2, Timestamp: time.Unix(1, 0), Line: []byte("how")},
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: []byte("doing?")},
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: []byte("are")},
{StreamID: 2, Timestamp: time.Unix(1, 0), Line: []byte("doing?")},
})
)

@ -36,6 +36,10 @@ func New(opts logql.EngineOpts, bucket objstore.Bucket, limits logql.Limits, reg
ms = metastore.NewObjectMetastore(bucket, logger)
}
if opts.BatchSize <= 0 {
panic(fmt.Sprintf("invalid batch size for query engine. must be greater than 0, got %d", opts.BatchSize))
}
return &QueryEngine{
logger: logger,
metrics: newMetrics(reg),

@ -19,7 +19,6 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
"github.com/grafana/loki/v3/pkg/util/topk"
)
type dataobjScan struct {
@ -29,6 +28,7 @@ type dataobjScan struct {
initialized bool
reader *logs.RowReader
streams map[int64]labels.Labels
records []logs.Record
state state
}
@ -46,6 +46,8 @@ type dataobjScanOptions struct {
Direction physical.SortOrder // Order of timestamps to return (ASC=Forward, DESC=Backward)
Limit uint32 // A limit on the number of rows to return (0=unlimited).
batchSize int64 // The buffer size for reading rows, derived from the engine batch size.
}
var _ Pipeline = (*dataobjScan)(nil)
@ -55,6 +57,10 @@ var _ Pipeline = (*dataobjScan)(nil)
// returned record are ordered by timestamp in the direction specified by
// opts.Direction.
func newDataobjScanPipeline(ctx context.Context, opts dataobjScanOptions) *dataobjScan {
if opts.Direction == physical.ASC {
// It's ok to panic here, because the validation of log query direction is performed in the logical planner.
panic("sorting by timestamp ASC is not supported by DataObjScan")
}
return &dataobjScan{ctx: ctx, opts: opts}
}
@ -78,6 +84,8 @@ func (s *dataobjScan) init() error {
return nil
}
s.records = make([]logs.Record, 0, s.opts.batchSize)
if err := s.initStreams(); err != nil {
return fmt.Errorf("initializing streams: %w", err)
}
@ -130,7 +138,7 @@ func (s *dataobjScan) initStreams() error {
var sr streams.RowReader
defer sr.Close()
streamsBuf := make([]streams.Stream, 512)
streamsBuf := make([]streams.Stream, s.opts.batchSize)
// Initialize entries in the map so we can do a presence test in the loop
// below.
@ -187,43 +195,26 @@ func (s *dataobjScan) initStreams() error {
// from the data. It returns an error upon encountering an error while reading
// one of the sections.
func (s *dataobjScan) read() (arrow.Record, error) {
// Since [physical.DataObjScan] requires that:
//
// * Records are ordered by timestamp, and
// * Records from the same dataobjScan do not overlap in time
//
// we *must* read the entire section before creating a record, as the
// sections in the dataobj itself are not already sorted by timestamp (though
// we only need to keep up to Limit rows in memory).
heap := topk.Heap[logs.Record]{
Limit: int(s.opts.Limit),
Less: s.getLessFunc(s.opts.Direction),
}
var (
n int // number of rows yielded by the datobj reader
err error // error yielded by the dataobj reader
)
var gotData bool
// Read from the dataobj until it yields at least one row, to avoid these function calls from the parent.
for n == 0 {
// Reset buffer
s.records = s.records[:s.opts.batchSize]
for {
buf := make([]logs.Record, 1024) // do not re-use buffer
n, err := s.reader.Read(s.ctx, buf)
n, err = s.reader.Read(s.ctx, s.records)
if n == 0 && errors.Is(err, io.EOF) {
break
return nil, EOF
} else if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}
gotData = true
for _, rec := range buf[:n] {
heap.Push(rec)
}
}
s.records = s.records[:n]
if !gotData {
return nil, EOF
}
projections, err := s.effectiveProjections(&heap)
projections, err := s.effectiveProjections(s.records)
if err != nil {
return nil, fmt.Errorf("getting effective projections: %w", err)
}
@ -237,10 +228,7 @@ func (s *dataobjScan) read() (arrow.Record, error) {
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
defer rb.Release()
records := heap.PopAll()
slices.Reverse(records)
for _, record := range records {
for _, record := range s.records {
for i := 0; i < schema.NumFields(); i++ {
field, builder := rb.Schema().Field(i), rb.Field(i)
s.appendToBuilder(builder, &field, &record)
@ -302,7 +290,7 @@ func (s *dataobjScan) getLessFunc(direction physical.SortOrder) func(a, b logs.R
// * Log message
//
// effectiveProjections does not mutate h.
func (s *dataobjScan) effectiveProjections(h *topk.Heap[logs.Record]) ([]physical.ColumnExpression, error) {
func (s *dataobjScan) effectiveProjections(records []logs.Record) ([]physical.ColumnExpression, error) {
if len(s.opts.Projections) > 0 {
return s.opts.Projections, nil
}
@ -324,7 +312,7 @@ func (s *dataobjScan) effectiveProjections(h *topk.Heap[logs.Record]) ([]physica
}
}
for rec := range h.Range() {
for _, rec := range records {
stream, ok := s.streams[rec.StreamID]
if !ok {
// If we hit this, there's a problem with either initStreams (we missed a

@ -64,8 +64,9 @@ func Test_dataobjScan(t *testing.T) {
StreamIDs: []int64{1, 2}, // All streams
Section: 0, // First section.
Projections: nil, // All columns
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -77,10 +78,10 @@ func Test_dataobjScan(t *testing.T) {
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage, Nullable: true},
}
expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
expectCSV := `prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world
prod,loki,aaaa-bbbb-cccc-dddd,NULL,1970-01-01 00:00:05,hello world
prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
@ -98,8 +99,9 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}},
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}},
},
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -107,10 +109,10 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
}
expectCSV := `1970-01-01 00:00:02,prod
1970-01-01 00:00:03,prod
expectCSV := `1970-01-01 00:00:10,prod
1970-01-01 00:00:05,prod
1970-01-01 00:00:10,prod`
1970-01-01 00:00:03,prod
1970-01-01 00:00:02,prod`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
@ -129,8 +131,9 @@ prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -192,8 +195,9 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
StreamIDs: []int64{1, 2, 3}, // All streams
Section: 0, // First section.
Projections: nil, // All columns
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -209,9 +213,9 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) {
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinMessage, Nullable: true},
}
expectCSV := `prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1
expectCSV := `prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3
prod,NULL,NULL,loki,namespace-1,NULL,1970-01-01 00:00:02,message 2
prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3`
prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
@ -228,8 +232,9 @@ prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -237,9 +242,9 @@ prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3`
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
}
expectCSV := `pod-1,override
expectCSV := `NULL,NULL
NULL,NULL
NULL,NULL`
pod-1,override`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
@ -256,8 +261,9 @@ NULL,NULL`
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.ASC,
Direction: physical.DESC,
Limit: 0, // No limit
batchSize: 512,
})
expectFields := []arrow.Field{
@ -265,9 +271,9 @@ NULL,NULL`
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
}
expectCSV := `NULL,NULL
expectCSV := `namespace-2,NULL
NULL,namespace-1
namespace-2,NULL`
NULL,NULL`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)

@ -97,6 +97,8 @@ func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObj
Direction: node.Direction,
Limit: node.Limit,
batchSize: c.batchSize,
})
}

@ -3,6 +3,7 @@ package executor
import (
"errors"
"fmt"
"slices"
"sort"
"github.com/apache/arrow-go/v18/arrow"
@ -11,14 +12,16 @@ import (
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
type compareFunc[T comparable] func(a, b T) bool
// NewSortMergePipeline returns a new pipeline that merges already sorted inputs into a single output.
func NewSortMergePipeline(inputs []Pipeline, order physical.SortOrder, column physical.ColumnExpression, evaluator expressionEvaluator) (*KWayMerge, error) {
var compare func(a, b int64) bool
var lessFunc func(a, b int64) bool
switch order {
case physical.ASC:
compare = func(a, b int64) bool { return a <= b }
lessFunc = func(a, b int64) bool { return a <= b }
case physical.DESC:
compare = func(a, b int64) bool { return a >= b }
lessFunc = func(a, b int64) bool { return a >= b }
default:
return nil, fmt.Errorf("invalid sort order %v", order)
}
@ -26,7 +29,7 @@ func NewSortMergePipeline(inputs []Pipeline, order physical.SortOrder, column ph
return &KWayMerge{
inputs: inputs,
columnEval: evaluator.newFunc(column),
compare: compare,
compare: lessFunc,
}, nil
}
@ -42,7 +45,7 @@ type KWayMerge struct {
exhausted []bool
offsets []int64
columnEval evalFunc
compare func(a, b int64) bool
compare compareFunc[int64]
}
var _ Pipeline = (*KWayMerge)(nil)
@ -101,42 +104,44 @@ func (p *KWayMerge) init() {
// Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step.
// Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to Read.
func (p *KWayMerge) read() error {
start:
// Release previous batch
if p.state.batch != nil {
p.state.batch.Release()
}
timestamps := make([]int64, 0, len(p.inputs))
batchIndexes := make([]int, 0, len(p.inputs))
inputIndexes := make([]int, 0, len(p.inputs))
loop:
for i := range len(p.inputs) {
// Skip exhausted inputs
if p.exhausted[i] {
continue
continue loop
}
// Load next batch if it hasn't been loaded yet, or if current one is already fully consumed
if p.batches[i] == nil || p.offsets[i] == p.batches[i].NumRows() {
// Read another batch as long as the input yields zero-length batches.
for p.batches[i] == nil || p.offsets[i] == p.batches[i].NumRows() {
// Reset offset
p.offsets[i] = 0
// Read from input
err := p.inputs[i].Read()
if err != nil {
if errors.Is(err, EOF) {
p.exhausted[i] = true
continue
p.batches[i] = nil // remove reference to arrow.Record from slice
continue loop
}
return err
}
p.offsets[i] = 0
// It is safe to use the value from the Value() call, because the error is already checked after the Read() call.
// In case the input is exhausted (reached EOF), the return value is `nil`, however, since the flag `p.exhausted[i]` is set, the value will never be read.
p.batches[i], _ = p.inputs[i].Value()
}
// Prevent out-of-bounds error: `p.inputs[i].Read()` returned a batch with 0 rows, and therefore does not have a value at offset `p.offsets[i]`.
// However, since the call did not return EOF, the next read may return rows again, so we only skip without marking the input as exhausted.
if p.batches[i].NumRows() == 0 {
continue
}
// Fetch timestamp value at current offset
col, err := p.columnEval(p.batches[i])
if err != nil {
@ -149,27 +154,30 @@ func (p *KWayMerge) read() error {
ts := tsCol.Value(int(p.offsets[i]))
// Populate slices for sorting
batchIndexes = append(batchIndexes, i)
inputIndexes = append(inputIndexes, i)
timestamps = append(timestamps, int64(ts))
}
// Pipeline is exhausted if no more input batches are available
if len(batchIndexes) == 0 {
if !slices.Contains(p.exhausted, false) {
p.state = Exhausted
return p.state.err
}
if len(inputIndexes) == 0 {
goto start
}
// If there is only a single remaining batch, return the remaining record
if len(batchIndexes) == 1 {
j := batchIndexes[0]
if len(inputIndexes) == 1 {
j := inputIndexes[0]
start := p.offsets[j]
end := p.batches[j].NumRows()
// check against empty batch
if start > end || end == 0 {
p.state = successState(p.batches[j])
p.offsets[j] = end
return nil
// check against empty last batch
if start >= end || end == 0 {
p.state = Exhausted
return p.state.err
}
p.state = successState(p.batches[j].NewSlice(start, end))
@ -177,10 +185,10 @@ func (p *KWayMerge) read() error {
return nil
}
sortIndexesByTimestamps(batchIndexes, timestamps, p.compare)
sortIndexesByTimestamps(inputIndexes, timestamps, p.compare)
// Return the slice of the current record
j := batchIndexes[0]
j := inputIndexes[0]
// Fetch timestamp value at current offset
col, err := p.columnEval(p.batches[j])
@ -215,7 +223,7 @@ func (p *KWayMerge) read() error {
return nil
}
func sortIndexesByTimestamps(indexes []int, timestamps []int64, lessFn func(a, b int64) bool) {
func sortIndexesByTimestamps(indexes []int, timestamps []int64, lessFn compareFunc[int64]) {
if len(indexes) != len(timestamps) {
panic("lengths of indexes and timestamps must match")
}

@ -103,6 +103,9 @@ func buildPlanForLogQuery(expr syntax.LogSelectorExpr, params logql.Params, isMe
if !isMetricQuery {
// SORT -> SortMerge
direction := params.Direction()
if direction == logproto.FORWARD {
return nil, fmt.Errorf("forward search log queries are not supported: %w", errUnimplemented)
}
ascending := direction == logproto.FORWARD
builder = builder.Sort(*timestampColumnRef(), ascending, false)
}

@ -83,7 +83,7 @@ func TestConvertAST_Success(t *testing.T) {
statement: `{cluster="prod", namespace=~"loki-.*"} | foo="bar" or bar="baz" |= "metric.go" |= "foo" or "bar" !~ "(a|b|c)" `,
start: 3600,
end: 7200,
direction: logproto.FORWARD,
direction: logproto.BACKWARD, // ASC is not supported
limit: 1000,
}
logicalPlan, err := BuildPlan(q)
@ -94,7 +94,7 @@ func TestConvertAST_Success(t *testing.T) {
%2 = MATCH_RE label.namespace "loki-.*"
%3 = AND %1 %2
%4 = MAKETABLE [selector=%3, shard=0_of_1]
%5 = SORT %4 [column=builtin.timestamp, asc=true, nulls_first=false]
%5 = SORT %4 [column=builtin.timestamp, asc=false, nulls_first=false]
%6 = GTE builtin.timestamp 1970-01-01T01:00:00Z
%7 = SELECT %5 [predicate=%6]
%8 = LT builtin.timestamp 1970-01-01T02:00:00Z
@ -243,7 +243,7 @@ func TestCanExecuteQuery(t *testing.T) {
statement: tt.statement,
start: 1000,
end: 2000,
direction: logproto.FORWARD,
direction: logproto.BACKWARD,
limit: 1000,
}

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/logical"
)
@ -165,16 +166,27 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) ([]Node,
nodes := make([]Node, 0, len(objects))
for i := range objects {
node := &SortMerge{
Column: newColumnExpr(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin),
Order: ctx.direction, // apply direction from previously visited Sort node
}
p.plan.addNode(node)
for _, section := range sections[i] {
node := &DataObjScan{
scan := &DataObjScan{
Location: objects[i],
StreamIDs: streams[i],
Section: section,
Direction: ctx.direction, // apply direction from previously visited Sort node
}
p.plan.addNode(node)
nodes = append(nodes, node)
p.plan.addNode(scan)
if err := p.plan.addEdge(Edge{Parent: node, Child: scan}); err != nil {
return nil, err
}
}
nodes = append(nodes, node)
}
return nodes, nil
}

@ -109,26 +109,30 @@ func TestMockCatalog(t *testing.T) {
}
func locations(t *testing.T, nodes []Node) []string {
func locations(t *testing.T, plan *Plan, nodes []Node) []string {
res := make([]string, 0, len(nodes))
for _, n := range nodes {
obj, ok := n.(*DataObjScan)
if !ok {
t.Fatalf("failed to cast Node to DataObjScan, got %T", n)
for _, scan := range plan.Children(n) {
obj, ok := scan.(*DataObjScan)
if !ok {
t.Fatalf("failed to cast Node to DataObjScan, got %T", n)
}
res = append(res, string(obj.Location))
}
res = append(res, string(obj.Location))
}
return res
}
func sections(t *testing.T, nodes []Node) [][]int {
func sections(t *testing.T, plan *Plan, nodes []Node) [][]int {
res := make([][]int, 0, len(nodes))
for _, n := range nodes {
obj, ok := n.(*DataObjScan)
if !ok {
t.Fatalf("failed to cast Node to DataObjScan, got %T", n)
for _, scan := range plan.Children(n) {
obj, ok := scan.(*DataObjScan)
if !ok {
t.Fatalf("failed to cast Node to DataObjScan, got %T", n)
}
res = append(res, []int{obj.Section})
}
res = append(res, []int{obj.Section})
}
return res
}
@ -204,9 +208,8 @@ func TestPlanner_ConvertMaketable(t *testing.T) {
planner.reset()
nodes, err := planner.processMakeTable(relation, NewContext(time.Now(), time.Now()))
require.NoError(t, err)
require.Equal(t, tt.expPaths, locations(t, nodes))
require.Equal(t, tt.expSections, sections(t, nodes))
require.Equal(t, tt.expPaths, locations(t, planner.plan, nodes))
require.Equal(t, tt.expSections, sections(t, planner.plan, nodes))
})
}
}

@ -297,13 +297,13 @@ func BenchmarkLogQL(b *testing.B) {
// Run benchmarks for both storage types
for _, storeType := range allStores {
engine, config := setupBenchmarkWithStore(b, storeType)
ctx := user.InjectOrgID(context.Background(), testTenant)
// Generate test cases using the loaded config
cases := config.GenerateTestCases()
for _, c := range cases {
b.Run(fmt.Sprintf("query=%s/kind=%s/store=%s", c.Name(), c.Kind(), storeType), func(b *testing.B) {
ctx := user.InjectOrgID(context.Background(), testTenant)
params, err := logql.NewLiteralParams(
c.Query,
c.Start,
@ -323,6 +323,8 @@ func BenchmarkLogQL(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
r, err := q.Exec(ctx)
require.NoError(b, err)
b.ReportMetric(float64(r.Statistics.Summary.TotalLinesProcessed), "linesProcessed")

@ -41,6 +41,7 @@ func NewDataObjV2EngineStore(dataDir string, tenantID string) (*DataObjV2EngineS
// Default EngineOpts. Adjust if specific configurations are needed.
engineOpts := logql.EngineOpts{
EnableV2Engine: true,
BatchSize: 512,
}
// Instantiate the new engine

@ -418,7 +418,7 @@ func setupAPI(t *testing.T, querier *querierMock, enableMetricAggregation bool)
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
api := NewQuerierAPI(Config{}, querier, limits, nil, nil, log.NewNopLogger())
api := NewQuerierAPI(mockQuerierConfig(), querier, limits, nil, nil, log.NewNopLogger())
return api
}

@ -90,6 +90,9 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {
func mockQuerierConfig() Config {
return Config{
TailMaxDuration: 1 * time.Minute,
Engine: logql.EngineOpts{
BatchSize: 1,
},
}
}

Loading…
Cancel
Save