From 5f3300a0848380bb901c44148ffdc43703fc4717 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 27 Jun 2025 10:26:03 +0530 Subject: [PATCH] chore(bench_test): correctly integrate new query engine + misc changes (#18172) --- pkg/engine/engine.go | 2 +- pkg/logql/bench/bench_test.go | 185 ++++++++++++--------- pkg/logql/bench/generator_query.go | 20 ++- pkg/logql/bench/store_dataobj_v2_engine.go | 74 --------- 4 files changed, 118 insertions(+), 163 deletions(-) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 00591e2247..b6b6775a1c 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -97,6 +97,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo ) t = time.Now() // start stopwatch for physical planning + statsCtx, ctx := stats.NewContext(ctx) executionContext := physical.NewContext(ctx, e.metastore, params.Start(), params.End()) planner := physical.NewPlanner(executionContext) plan, err := planner.Build(logicalPlan) @@ -137,7 +138,6 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo return builder.empty(), err } - statsCtx := stats.FromContext(ctx) builder.setStats(statsCtx.Result(time.Since(start), 0, builder.len())) e.metrics.subqueries.WithLabelValues(statusSuccess).Inc() diff --git a/pkg/logql/bench/bench_test.go b/pkg/logql/bench/bench_test.go index 055d448edd..84415bb360 100644 --- a/pkg/logql/bench/bench_test.go +++ b/pkg/logql/bench/bench_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/user" @@ -41,7 +42,7 @@ var allStores = []string{StoreDataObj, StoreDataObjV2Engine, StoreChunk} // setupBenchmarkWithStore sets up the benchmark environment with the specified store type // and returns the necessary components -func setupBenchmarkWithStore(tb testing.TB, storeType string) (*logql.QueryEngine, *GeneratorConfig) { +func setupBenchmarkWithStore(tb testing.TB, storeType string) (logql.Engine, *GeneratorConfig) { tb.Helper() entries, err := os.ReadDir(DefaultDataDir) if err != nil || len(entries) == 0 { @@ -61,10 +62,8 @@ func setupBenchmarkWithStore(tb testing.TB, storeType string) (*logql.QueryEngin if err != nil { tb.Fatal(err) } - querier, err = store.Querier() - if err != nil { - tb.Fatal(err) - } + + return store.engine, config case StoreDataObj: store, err := NewDataObjStore(DefaultDataDir, testTenant) if err != nil { @@ -105,7 +104,7 @@ func TestStorageEquality(t *testing.T) { type store struct { Name string Cases []TestCase - Engine *logql.QueryEngine + Engine logql.Engine } generateStore := func(name string) *store { @@ -138,54 +137,72 @@ func TestStorageEquality(t *testing.T) { } for _, baseCase := range baseStore.Cases { - t.Run(baseCase.Name(), func(t *testing.T) { - defer func() { - if t.Failed() { - t.Logf("Re-run just this test with -test.run='%s'", testNameRegex(t.Name())) - } - }() - - t.Logf("Query information:\n%s", baseCase.Description()) + for _, store := range stores { + if store == baseStore { + continue + } - params, err := logql.NewLiteralParams( - baseCase.Query, - baseCase.Start, - baseCase.End, - baseCase.Step, - 0, - baseCase.Direction, - 1000, - nil, - nil, - ) - require.NoError(t, err) + t.Run(fmt.Sprintf("query=%s/kind=%s/store=%s", baseCase.Name(), baseCase.Kind(), store.Name), func(t *testing.T) { + defer func() { + if t.Failed() { + t.Logf("Re-run just this test with -test.run='%s'", testNameRegex(t.Name())) + } + }() - expected, err := baseStore.Engine.Query(params).Exec(ctx) - require.NoError(t, err) + t.Logf("Query information:\n%s", baseCase.Description()) - // Find matching test case in other stores and then compare results. - for _, store := range stores { - if store == baseStore { - continue - } + params, err := logql.NewLiteralParams( + baseCase.Query, + baseCase.Start, + baseCase.End, + baseCase.Step, + 0, + baseCase.Direction, + 1000, + nil, + nil, + ) + require.NoError(t, err) + + expected, err := baseStore.Engine.Query(params).Exec(ctx) + require.NoError(t, err) + + t.Logf(`Summary stats: store=%s lines_processed=%d, entries_returned=%d, bytes_processed=%s, execution_time_in_secs=%d, bytes_processed_per_sec=%s`, + baseStore.Name, + expected.Statistics.Summary.TotalLinesProcessed, + expected.Statistics.Summary.TotalEntriesReturned, + humanize.Bytes(uint64(expected.Statistics.Summary.TotalBytesProcessed)), + uint64(expected.Statistics.Summary.ExecTime), + humanize.Bytes(uint64(expected.Statistics.Summary.BytesProcessedPerSecond)), + ) + // Find matching test case in other stores and then compare results. idx := slices.IndexFunc(store.Cases, func(tc TestCase) bool { return tc == baseCase }) if idx == -1 { t.Logf("Store %s missing test case %s", store.Name, baseCase.Name()) - continue + return } actual, err := store.Engine.Query(params).Exec(ctx) if err != nil && errors.Is(err, errStoreUnimplemented) { t.Logf("Store %s does not implement test case %s", store.Name, baseCase.Name()) - continue + return } else if assert.NoError(t, err) { + t.Logf(`Summary stats: store=%s lines_processed=%d, entries_returned=%d, bytes_processed=%s, execution_time_in_secs=%d, bytes_processed_per_sec=%s`, + store.Name, + actual.Statistics.Summary.TotalLinesProcessed, + actual.Statistics.Summary.TotalEntriesReturned, + humanize.Bytes(uint64(actual.Statistics.Summary.TotalBytesProcessed)), + uint64(actual.Statistics.Summary.ExecTime), + humanize.Bytes(uint64(actual.Statistics.Summary.BytesProcessedPerSecond)), + ) + assert.Equal(t, expected.Data, actual.Data, "store %q results do not match base store %q", store.Name, baseStore.Name) } - } - }) + }) + } } } @@ -204,7 +221,8 @@ func testNameRegex(name string) string { func TestLogQLQueries(t *testing.T) { // We keep this test for debugging even though it's too slow for now. t.Skip("Too slow for now.") - engine, config := setupBenchmarkWithStore(t, StoreDataObjV2Engine) + store := StoreDataObjV2Engine + engine, config := setupBenchmarkWithStore(t, store) ctx := user.InjectOrgID(context.Background(), testTenant) // Generate test cases @@ -222,53 +240,56 @@ func TestLogQLQueries(t *testing.T) { // } for _, c := range cases { - // Uncomment this to run only log queries - // if c.Kind() != "log" { - // continue - // } - if _, exists := uniqueQueries[c.Query]; exists { - continue - } - uniqueQueries[c.Query] = struct{}{} + t.Run(fmt.Sprintf("query=%s/kind=%s/store=%s", c.Name(), c.Kind(), store), func(t *testing.T) { + + // Uncomment this to run only log queries + // if c.Kind() != "log" { + // continue + // } + if _, exists := uniqueQueries[c.Query]; exists { + t.Skip("skipping duplicate query: " + c.Query) + } + uniqueQueries[c.Query] = struct{}{} - t.Log(c.Description()) - params, err := logql.NewLiteralParams( - c.Query, - c.Start, - c.Start.Add(5*time.Minute), - 1*time.Minute, - 0, - c.Direction, - 1000, - nil, - nil, - ) - require.NoError(t, err) - - q := engine.Query(params) - res, err := q.Exec(ctx) - require.NoError(t, err) - require.Equal(t, logqlmodel.ValueTypeStreams, string(res.Data.Type())) - xs := res.Data.(logqlmodel.Streams) - require.Greater(t, len(xs), 0, "no streams returned") - - if testing.Verbose() { - // Log the result type and some basic stats - t.Logf("Result Type: %s", res.Data.Type()) - switch v := res.Data.(type) { - case promql.Vector: - t.Logf("Number of Samples: %d", len(v)) - if len(v) > 0 { - t.Logf("First Sample: %+v", v[0]) - } - case promql.Matrix: - t.Logf("Number of Series: %d", len(v)) - if len(v) > 0 { - t.Logf("First Series: %+v", v[0]) + t.Log(c.Description()) + params, err := logql.NewLiteralParams( + c.Query, + c.Start, + c.Start.Add(5*time.Minute), + 1*time.Minute, + 0, + c.Direction, + 1000, + nil, + nil, + ) + require.NoError(t, err) + + q := engine.Query(params) + res, err := q.Exec(ctx) + require.NoError(t, err) + require.Equal(t, logqlmodel.ValueTypeStreams, string(res.Data.Type())) + xs := res.Data.(logqlmodel.Streams) + require.Greater(t, len(xs), 0, "no streams returned") + + if testing.Verbose() { + // Log the result type and some basic stats + t.Logf("Result Type: %s", res.Data.Type()) + switch v := res.Data.(type) { + case promql.Vector: + t.Logf("Number of Samples: %d", len(v)) + if len(v) > 0 { + t.Logf("First Sample: %+v", v[0]) + } + case promql.Matrix: + t.Logf("Number of Series: %d", len(v)) + if len(v) > 0 { + t.Logf("First Series: %+v", v[0]) + } } + t.Log("----------------------------------------") } - t.Log("----------------------------------------") - } + }) } } diff --git a/pkg/logql/bench/generator_query.go b/pkg/logql/bench/generator_query.go index b4b921aae3..cc0becbf87 100644 --- a/pkg/logql/bench/generator_query.go +++ b/pkg/logql/bench/generator_query.go @@ -170,13 +170,19 @@ func (c *GeneratorConfig) GenerateTestCases() []TestCase { addBidirectional(selector+` | logfmt | level="error" | detected_level="error"`, c.StartTime, end) // Metric queries with structured metadata - baseMetricQuery := fmt.Sprintf(`rate(%s | detected_level=~"error|warn" [5m])`, selector) + baseRangeAggregationQueries := []string{ + fmt.Sprintf(`count_over_time(%s[5m])`, selector), + fmt.Sprintf(`count_over_time(%s | detected_level=~"error|warn" [5m])`, selector), + fmt.Sprintf(`rate(%s | detected_level=~"error|warn" [5m])`, selector), + } // Single dimension aggregations - dimensions := []string{"pod", "namespace", "env"} + dimensions := []string{"pod", "namespace", "env", "detected_level"} for _, dim := range dimensions { - query := fmt.Sprintf(`sum by (%s) (%s)`, dim, baseMetricQuery) - addMetricQuery(query, c.StartTime.Add(5*time.Minute), end, step) + for _, baseMetricQuery := range baseRangeAggregationQueries { + query := fmt.Sprintf(`sum by (%s) (%s)`, dim, baseMetricQuery) + addMetricQuery(query, c.StartTime.Add(5*time.Minute), end, step) + } } // Two dimension aggregations @@ -185,8 +191,10 @@ func (c *GeneratorConfig) GenerateTestCases() []TestCase { {"env", "component"}, } for _, dims := range twoDimCombos { - query := fmt.Sprintf(`sum by (%s, %s) (%s)`, dims[0], dims[1], baseMetricQuery) - addMetricQuery(query, c.StartTime.Add(5*time.Minute), end, step) + for _, baseMetricQuery := range baseRangeAggregationQueries { + query := fmt.Sprintf(`sum by (%s, %s) (%s)`, dims[0], dims[1], baseMetricQuery) + addMetricQuery(query, c.StartTime.Add(5*time.Minute), end, step) + } } // Error rates by severity diff --git a/pkg/logql/bench/store_dataobj_v2_engine.go b/pkg/logql/bench/store_dataobj_v2_engine.go index 945b68f54b..b5feb7ac98 100644 --- a/pkg/logql/bench/store_dataobj_v2_engine.go +++ b/pkg/logql/bench/store_dataobj_v2_engine.go @@ -1,7 +1,6 @@ package bench import ( - "context" "errors" "fmt" "path/filepath" @@ -10,10 +9,7 @@ import ( "github.com/thanos-io/objstore/providers/filesystem" "github.com/grafana/loki/v3/pkg/engine" - "github.com/grafana/loki/v3/pkg/iter" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" - "github.com/grafana/loki/v3/pkg/logqlmodel" ) var errStoreUnimplemented = errors.New("store does not implement this operation") @@ -61,73 +57,3 @@ func NewDataObjV2EngineStore(dataDir string, tenantID string) (*DataObjV2EngineS dataDir: dataDir, }, nil } - -// Querier returns a logql.Querier for the DataObjV2EngineStore. -func (s *DataObjV2EngineStore) Querier() (logql.Querier, error) { - return &dataObjV2EngineQuerier{ - engine: s.engine, - tenantID: s.tenantID, // Pass tenantID if SelectLogs needs it for context - }, nil -} - -type dataObjV2EngineQuerier struct { - engine logql.Engine - tenantID string -} - -// SelectLogs implements logql.Querier. -func (q *dataObjV2EngineQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { - // Construct logql.Params from logql.SelectLogParams - // The logql.SelectLogParams.Query is the full LogQL query string. - logqlParams, err := logql.NewLiteralParams( - params.QueryRequest.Plan.String(), // Assuming this is the correct way to get the full query string - params.Start, - params.End, - 0, - 0, - params.Direction, - params.Limit, - nil, - nil, - ) - if err != nil { - return nil, fmt.Errorf("DataObjV2EngineStore failed to create literal params: %w", err) - } - - // Inject tenantID into context if required by the engine. - // This is a common pattern. - // ctx = user.InjectOrgID(ctx, q.tenantID) // If using dskit/user for tenant context - - // Execute query - compiledQuery := q.engine.Query(logqlParams) - result, err := compiledQuery.Exec(ctx) - if err != nil && errors.Is(err, engine.ErrNotSupported) { - return nil, errors.Join(errStoreUnimplemented, err) - } else if err != nil { - return nil, fmt.Errorf("DataObjV2EngineStore query execution failed: %w", err) - } - - // Convert result (logqlmodel.Streams) to iter.EntryIterator - switch data := result.Data.(type) { - case logqlmodel.Streams: - return newStreamsEntryIterator(data, params.Direction), nil // Pass direction - default: - return nil, fmt.Errorf("DataObjV2EngineStore: unexpected result type for SelectLogs: %T", result.Data) - } -} - -// SelectSamples implements logql.Querier. -func (q *dataObjV2EngineQuerier) SelectSamples(_ context.Context, _ logql.SelectSampleParams) (iter.SampleIterator, error) { - return nil, errStoreUnimplemented -} - -// newStreamsEntryIterator creates a sorted entry iterator from multiple logqlmodel.Streams. -func newStreamsEntryIterator(streams logqlmodel.Streams, direction logproto.Direction) iter.EntryIterator { - iterators := make([]iter.EntryIterator, 0, len(streams)) - for _, stream := range streams { - if len(stream.Entries) > 0 { - iterators = append(iterators, iter.NewStreamIterator(stream)) - } - } - return iter.NewSortEntryIterator(iterators, direction) -}