From 24ca0e4a2f72c87e07309b033ab8ff4b9c1902a4 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 17 Apr 2026 12:33:50 +0200 Subject: [PATCH] feat: Remove task_pruning_cache_fetch_batch_size in favor of existing memcached batching (#21593) --- docs/sources/shared/configuration.md | 5 -- pkg/engine/engine.go | 16 ++-- pkg/engine/internal/workflow/workflow.go | 6 -- .../internal/workflow/workflow_planner.go | 32 +++----- .../workflow/workflow_planner_test.go | 80 ------------------- 5 files changed, 19 insertions(+), 120 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 22836b03fa..c7f911e865 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -371,11 +371,6 @@ query_engine: # CLI flag: -query-engine.task-results-cache.prune-empty-cached-tasks [prune_empty_cached_tasks: | default = false] - # Experimental: Maximum number of cache keys fetched per batch when pruning - # empty cached tasks at plan time. 0 means unlimited (single batch). - # CLI flag: -query-engine.task-results-cache.task-pruning-cache-fetch-batch-size - [task_pruning_cache_fetch_batch_size: | default = 5000] - # Experimental: Number of worker threads to spawn. Each worker thread runs one # task at a time. 0 means to use GOMAXPROCS value. # CLI flag: -query-engine.worker-threads diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 2ab068a684..f8e2bfd4ea 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -100,7 +100,6 @@ type TaskCacheConfig struct { TaskResultMaxCacheableSize flagext.Bytes `yaml:"task_result_max_cacheable_size" category:"experimental"` DataObjScanResultMaxCacheableSize flagext.Bytes `yaml:"dataobjscan_result_max_cacheable_size" category:"experimental"` PruneEmptyCachedTasks bool `yaml:"prune_empty_cached_tasks" category:"experimental"` - TaskPruningCacheFetchBatchSize int `yaml:"task_pruning_cache_fetch_batch_size" category:"experimental"` } // RegisterFlagsWithPrefix registers flags for TaskCacheConfig with the given prefix. @@ -112,8 +111,6 @@ func (cfg *TaskCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS "Experimental: Maximum size for a DataObjScan result to be cacheable. 0 means only empty responses are cached.") f.BoolVar(&cfg.PruneEmptyCachedTasks, prefix+"prune-empty-cached-tasks", false, "Experimental: When enabled, the scheduler checks cached results at plan time and prunes tasks whose cached result is known to be empty.") - f.IntVar(&cfg.TaskPruningCacheFetchBatchSize, prefix+"task-pruning-cache-fetch-batch-size", 5000, - "Experimental: Maximum number of cache keys fetched per batch when pruning empty cached tasks at plan time. 0 means unlimited (single batch).") } // Params holds parameters for constructing a new [Engine]. @@ -514,13 +511,12 @@ func (e *Engine) buildWorkflow(ctx context.Context, tenantID string, logger log. MaxRunningScanTasks: maxRunningScanTasks, MaxRunningOtherTasks: 0, - CacheEnabled: cacheEnabled, - MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize), - MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize), - CacheCompression: e.cfg.Executor.TaskResultsCache.Compression, - PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks, - TaskPruningCacheFetchBatchSize: e.cfg.Executor.TaskResultsCache.TaskPruningCacheFetchBatchSize, - TaskCacheRegistry: e.taskCaches, + CacheEnabled: cacheEnabled, + MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize), + MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize), + CacheCompression: e.cfg.Executor.TaskResultsCache.Compression, + PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks, + TaskCacheRegistry: e.taskCaches, DebugTasks: e.limits.DebugEngineTasks(tenantID), DebugStreams: e.limits.DebugEngineStreams(tenantID), diff --git a/pkg/engine/internal/workflow/workflow.go b/pkg/engine/internal/workflow/workflow.go index e84749e24e..eaf10be8f9 100644 --- a/pkg/engine/internal/workflow/workflow.go +++ b/pkg/engine/internal/workflow/workflow.go @@ -84,11 +84,6 @@ type Options struct { // result are eliminated at plan time before any work is dispatched. PruneEmptyCachedTasks bool - // TaskPruningCacheFetchBatchSize is the maximum number of cache keys fetched per - // batch when pruning empty cached tasks at plan time. 0 means all keys - // are fetched in a single call per cache name. - TaskPruningCacheFetchBatchSize int - // TaskCacheRegistry is the registry of cache backends used at plan time to // prune tasks whose cached result is known to be empty. TaskCacheRegistry executor.TaskCacheRegistry @@ -139,7 +134,6 @@ func New(opts Options, logger log.Logger, runner Runner, plan *physical.Plan) (* compression: opts.CacheCompression, registry: opts.TaskCacheRegistry, pruneEmptyCachedTasks: opts.PruneEmptyCachedTasks, - eliminationBatchSize: opts.TaskPruningCacheFetchBatchSize, }, logger) if err != nil { return nil, err diff --git a/pkg/engine/internal/workflow/workflow_planner.go b/pkg/engine/internal/workflow/workflow_planner.go index 67ad92a2e1..b9cbc18f38 100644 --- a/pkg/engine/internal/workflow/workflow_planner.go +++ b/pkg/engine/internal/workflow/workflow_planner.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/executor" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" - iterv2 "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" ) @@ -36,7 +35,6 @@ type cacheParams struct { compression string registry executor.TaskCacheRegistry pruneEmptyCachedTasks bool - eliminationBatchSize int } // planWorkflow partitions a physical plan into a graph of tasks. @@ -89,7 +87,7 @@ func planWorkflow(tenantID string, plan *physical.Plan, cacheOpts cacheParams, l return dag.Graph[*Task]{}, fmt.Errorf("injecting DataObjScan caching: %w", err) } if cacheOpts.pruneEmptyCachedTasks { - if err := eliminateEmptyCachedTasks(planner, cacheOpts.registry, cacheOpts.eliminationBatchSize, logger); err != nil { + if err := eliminateEmptyCachedTasks(planner, cacheOpts.registry, logger); err != nil { return dag.Graph[*Task]{}, fmt.Errorf("eliminating empty cached tasks: %w", err) } } @@ -124,13 +122,12 @@ func optimize(t *Task) { // task-level cache entry or its DataObjScan-level cache entry is an empty hit, // since zero scan rows guarantee zero rows from any operators above. // -// Keys are fetched in batches of batchSize per cache name (0 = single batch). // Cache fetch errors are non-fatal: a warning is logged and the batch is skipped. -func eliminateEmptyCachedTasks(p *planner, caches executor.TaskCacheRegistry, batchSize int, logger log.Logger) error { +func eliminateEmptyCachedTasks(p *planner, caches executor.TaskCacheRegistry, logger log.Logger) error { start := time.Now() keyToTask, backends, taskCount := collectCacheKeys(p, caches, logger) - toEliminate := findEmptyTasks(context.Background(), keyToTask, backends, batchSize, logger) + toEliminate := findEmptyTasks(context.Background(), keyToTask, backends, logger) // NOTE: tasks cannot be eliminated inside the walk or fetch loops since // dag.Graph.Eliminate uses slices.DeleteFunc which zeroes the tail of the @@ -210,7 +207,6 @@ func findEmptyTasks( ctx context.Context, keyToTask map[physical.TaskCacheName]map[string]*Task, backends map[physical.TaskCacheName]cache.Cache, - batchSize int, logger log.Logger, ) []*Task { toEliminate := make(map[*Task]struct{}) @@ -233,22 +229,20 @@ func findEmptyTasks( } } - for batch := range iterv2.Batches(keys, batchSize) { - found, bufs, _, err := c.Fetch(ctx, batch) + found, bufs, _, err := c.Fetch(ctx, keys) + if err != nil { + level.Error(logger).Log("msg", "cache fetch failed during task elimination", "err", err) + continue + } + for i, key := range found { + dec, err := executor.NewCacheEntryDecoder(bufs[i]) if err != nil { - level.Error(logger).Log("msg", "cache fetch failed during task elimination", "err", err) + level.Error(logger).Log("msg", "cache entry decoding failed during task elimination", "err", err) continue } - for i, key := range found { - dec, err := executor.NewCacheEntryDecoder(bufs[i]) - if err != nil { - level.Error(logger).Log("msg", "cache entry decoding failed during task elimination", "err", err) - continue - } - if dec.Len() == 0 { - toEliminate[keyMap[key]] = struct{}{} - } + if dec.Len() == 0 { + toEliminate[keyMap[key]] = struct{}{} } } } diff --git a/pkg/engine/internal/workflow/workflow_planner_test.go b/pkg/engine/internal/workflow/workflow_planner_test.go index f3e43c7b9d..2f454a88ed 100644 --- a/pkg/engine/internal/workflow/workflow_planner_test.go +++ b/pkg/engine/internal/workflow/workflow_planner_test.go @@ -1050,86 +1050,6 @@ func (m *workflowMockCache) Stop() {} func (m *workflowMockCache) GetCacheType() stats.CacheType { return stats.TaskResultCache } -// countingCache wraps workflowMockCache and counts the number of Fetch calls. -type countingCache struct { - *workflowMockCache - fetchCalls int -} - -func (c *countingCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { - c.fetchCalls++ - return c.workflowMockCache.Fetch(ctx, keys) -} - -// Test_eliminateEmptyCachedTasks_batching verifies that the number of Fetch -// calls matches the expected batching behaviour for different batch sizes. -func Test_eliminateEmptyCachedTasks_batching(t *testing.T) { - // Physical plan with 4 leaf tasks (one per DataObjScan location a–d). - // All task-level cache keys return empty, so every leaf (and the root) is eliminated. - var physicalGraph dag.Graph[physical.Node] - var ( - vecAgg = physicalGraph.Add(&physical.VectorAggregation{Operation: types.VectorAggregationTypeSum}) - parallelize = physicalGraph.Add(&physical.Parallelize{}) - rangeAgg = physicalGraph.Add(&physical.RangeAggregation{ - Operation: types.RangeAggregationTypeCount, - Start: time.Unix(5, 0).UTC(), - End: time.Unix(45, 0).UTC(), - }) - scanSet = physicalGraph.Add(&physical.ScanSet{ - Targets: []*physical.ScanTarget{ - {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{ - Location: "a", - MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()}, - }}, - {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{ - Location: "b", - MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()}, - }}, - {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{ - Location: "c", - MaxTimeRange: physical.TimeRange{Start: time.Unix(30, 0).UTC(), End: time.Unix(70, 0).UTC()}, - }}, - {Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{ - Location: "d", - MaxTimeRange: physical.TimeRange{Start: time.Unix(40, 0).UTC(), End: time.Unix(80, 0).UTC()}, - }}, - }, - }) - ) - _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vecAgg, Child: parallelize}) - _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: rangeAgg}) - _ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scanSet}) - physicalPlan := physical.FromGraph(physicalGraph) - physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500) - require.NoError(t, err) - - for _, tc := range []struct { - name string - batchSize int - expectedFetchCalls int - }{ - {name: "unlimited (batchSize=0)", batchSize: 0, expectedFetchCalls: 1}, - {name: "batchSize=1", batchSize: 1, expectedFetchCalls: 4}, - {name: "batchSize=2", batchSize: 2, expectedFetchCalls: 2}, - } { - t.Run(tc.name, func(t *testing.T) { - mc := &countingCache{workflowMockCache: newWorkflowMockCache()} - - _, err := planWorkflow("", physicalPlan, cacheParams{ - enabled: true, - taskCacheMaxSizeBytes: 1 << 20, - pruneEmptyCachedTasks: true, - eliminationBatchSize: tc.batchSize, - registry: executor.NewTestTaskCacheRegistry(map[physical.TaskCacheName]cache.Cache{ - physical.TaskCacheLogsScanRangeAggr: mc, - }), - }, log.NewNopLogger()) - require.NoError(t, err) - require.Equal(t, tc.expectedFetchCalls, mc.fetchCalls) - }) - } -} - // emptyResultPayload returns a zero-length buffer which is treated as an empty cached result. func emptyResultPayload() []byte { return []byte{} }