feat: Remove task_pruning_cache_fetch_batch_size in favor of existing memcached batching (#21593)

pull/21547/head^2
Salva Corts 2 months ago committed by GitHub
parent ad2a80e914
commit 24ca0e4a2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      docs/sources/shared/configuration.md
  2. 16
      pkg/engine/engine.go
  3. 6
      pkg/engine/internal/workflow/workflow.go
  4. 32
      pkg/engine/internal/workflow/workflow_planner.go
  5. 80
      pkg/engine/internal/workflow/workflow_planner_test.go

@ -371,11 +371,6 @@ query_engine:
# CLI flag: -query-engine.task-results-cache.prune-empty-cached-tasks
[prune_empty_cached_tasks: <boolean> | default = false]
# Experimental: Maximum number of cache keys fetched per batch when pruning
# empty cached tasks at plan time. 0 means unlimited (single batch).
# CLI flag: -query-engine.task-results-cache.task-pruning-cache-fetch-batch-size
[task_pruning_cache_fetch_batch_size: <int> | default = 5000]
# Experimental: Number of worker threads to spawn. Each worker thread runs one
# task at a time. 0 means to use GOMAXPROCS value.
# CLI flag: -query-engine.worker-threads

@ -100,7 +100,6 @@ type TaskCacheConfig struct {
TaskResultMaxCacheableSize flagext.Bytes `yaml:"task_result_max_cacheable_size" category:"experimental"`
DataObjScanResultMaxCacheableSize flagext.Bytes `yaml:"dataobjscan_result_max_cacheable_size" category:"experimental"`
PruneEmptyCachedTasks bool `yaml:"prune_empty_cached_tasks" category:"experimental"`
TaskPruningCacheFetchBatchSize int `yaml:"task_pruning_cache_fetch_batch_size" category:"experimental"`
}
// RegisterFlagsWithPrefix registers flags for TaskCacheConfig with the given prefix.
@ -112,8 +111,6 @@ func (cfg *TaskCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
"Experimental: Maximum size for a DataObjScan result to be cacheable. 0 means only empty responses are cached.")
f.BoolVar(&cfg.PruneEmptyCachedTasks, prefix+"prune-empty-cached-tasks", false,
"Experimental: When enabled, the scheduler checks cached results at plan time and prunes tasks whose cached result is known to be empty.")
f.IntVar(&cfg.TaskPruningCacheFetchBatchSize, prefix+"task-pruning-cache-fetch-batch-size", 5000,
"Experimental: Maximum number of cache keys fetched per batch when pruning empty cached tasks at plan time. 0 means unlimited (single batch).")
}
// Params holds parameters for constructing a new [Engine].
@ -514,13 +511,12 @@ func (e *Engine) buildWorkflow(ctx context.Context, tenantID string, logger log.
MaxRunningScanTasks: maxRunningScanTasks,
MaxRunningOtherTasks: 0,
CacheEnabled: cacheEnabled,
MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize),
MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize),
CacheCompression: e.cfg.Executor.TaskResultsCache.Compression,
PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks,
TaskPruningCacheFetchBatchSize: e.cfg.Executor.TaskResultsCache.TaskPruningCacheFetchBatchSize,
TaskCacheRegistry: e.taskCaches,
CacheEnabled: cacheEnabled,
MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize),
MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize),
CacheCompression: e.cfg.Executor.TaskResultsCache.Compression,
PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks,
TaskCacheRegistry: e.taskCaches,
DebugTasks: e.limits.DebugEngineTasks(tenantID),
DebugStreams: e.limits.DebugEngineStreams(tenantID),

@ -84,11 +84,6 @@ type Options struct {
// result are eliminated at plan time before any work is dispatched.
PruneEmptyCachedTasks bool
// TaskPruningCacheFetchBatchSize is the maximum number of cache keys fetched per
// batch when pruning empty cached tasks at plan time. 0 means all keys
// are fetched in a single call per cache name.
TaskPruningCacheFetchBatchSize int
// TaskCacheRegistry is the registry of cache backends used at plan time to
// prune tasks whose cached result is known to be empty.
TaskCacheRegistry executor.TaskCacheRegistry
@ -139,7 +134,6 @@ func New(opts Options, logger log.Logger, runner Runner, plan *physical.Plan) (*
compression: opts.CacheCompression,
registry: opts.TaskCacheRegistry,
pruneEmptyCachedTasks: opts.PruneEmptyCachedTasks,
eliminationBatchSize: opts.TaskPruningCacheFetchBatchSize,
}, logger)
if err != nil {
return nil, err

@ -14,7 +14,6 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
iterv2 "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
)
@ -36,7 +35,6 @@ type cacheParams struct {
compression string
registry executor.TaskCacheRegistry
pruneEmptyCachedTasks bool
eliminationBatchSize int
}
// planWorkflow partitions a physical plan into a graph of tasks.
@ -89,7 +87,7 @@ func planWorkflow(tenantID string, plan *physical.Plan, cacheOpts cacheParams, l
return dag.Graph[*Task]{}, fmt.Errorf("injecting DataObjScan caching: %w", err)
}
if cacheOpts.pruneEmptyCachedTasks {
if err := eliminateEmptyCachedTasks(planner, cacheOpts.registry, cacheOpts.eliminationBatchSize, logger); err != nil {
if err := eliminateEmptyCachedTasks(planner, cacheOpts.registry, logger); err != nil {
return dag.Graph[*Task]{}, fmt.Errorf("eliminating empty cached tasks: %w", err)
}
}
@ -124,13 +122,12 @@ func optimize(t *Task) {
// task-level cache entry or its DataObjScan-level cache entry is an empty hit,
// since zero scan rows guarantee zero rows from any operators above.
//
// Keys are fetched in batches of batchSize per cache name (0 = single batch).
// Cache fetch errors are non-fatal: a warning is logged and the batch is skipped.
func eliminateEmptyCachedTasks(p *planner, caches executor.TaskCacheRegistry, batchSize int, logger log.Logger) error {
func eliminateEmptyCachedTasks(p *planner, caches executor.TaskCacheRegistry, logger log.Logger) error {
start := time.Now()
keyToTask, backends, taskCount := collectCacheKeys(p, caches, logger)
toEliminate := findEmptyTasks(context.Background(), keyToTask, backends, batchSize, logger)
toEliminate := findEmptyTasks(context.Background(), keyToTask, backends, logger)
// NOTE: tasks cannot be eliminated inside the walk or fetch loops since
// dag.Graph.Eliminate uses slices.DeleteFunc which zeroes the tail of the
@ -210,7 +207,6 @@ func findEmptyTasks(
ctx context.Context,
keyToTask map[physical.TaskCacheName]map[string]*Task,
backends map[physical.TaskCacheName]cache.Cache,
batchSize int,
logger log.Logger,
) []*Task {
toEliminate := make(map[*Task]struct{})
@ -233,22 +229,20 @@ func findEmptyTasks(
}
}
for batch := range iterv2.Batches(keys, batchSize) {
found, bufs, _, err := c.Fetch(ctx, batch)
found, bufs, _, err := c.Fetch(ctx, keys)
if err != nil {
level.Error(logger).Log("msg", "cache fetch failed during task elimination", "err", err)
continue
}
for i, key := range found {
dec, err := executor.NewCacheEntryDecoder(bufs[i])
if err != nil {
level.Error(logger).Log("msg", "cache fetch failed during task elimination", "err", err)
level.Error(logger).Log("msg", "cache entry decoding failed during task elimination", "err", err)
continue
}
for i, key := range found {
dec, err := executor.NewCacheEntryDecoder(bufs[i])
if err != nil {
level.Error(logger).Log("msg", "cache entry decoding failed during task elimination", "err", err)
continue
}
if dec.Len() == 0 {
toEliminate[keyMap[key]] = struct{}{}
}
if dec.Len() == 0 {
toEliminate[keyMap[key]] = struct{}{}
}
}
}

@ -1050,86 +1050,6 @@ func (m *workflowMockCache) Stop() {}
func (m *workflowMockCache) GetCacheType() stats.CacheType { return stats.TaskResultCache }
// countingCache wraps workflowMockCache and counts the number of Fetch calls.
type countingCache struct {
*workflowMockCache
fetchCalls int
}
func (c *countingCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
c.fetchCalls++
return c.workflowMockCache.Fetch(ctx, keys)
}
// Test_eliminateEmptyCachedTasks_batching verifies that the number of Fetch
// calls matches the expected batching behaviour for different batch sizes.
func Test_eliminateEmptyCachedTasks_batching(t *testing.T) {
// Physical plan with 4 leaf tasks (one per DataObjScan location a–d).
// All task-level cache keys return empty, so every leaf (and the root) is eliminated.
var physicalGraph dag.Graph[physical.Node]
var (
vecAgg = physicalGraph.Add(&physical.VectorAggregation{Operation: types.VectorAggregationTypeSum})
parallelize = physicalGraph.Add(&physical.Parallelize{})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
Operation: types.RangeAggregationTypeCount,
Start: time.Unix(5, 0).UTC(),
End: time.Unix(45, 0).UTC(),
})
scanSet = physicalGraph.Add(&physical.ScanSet{
Targets: []*physical.ScanTarget{
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "a",
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
}},
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "b",
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()},
}},
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "c",
MaxTimeRange: physical.TimeRange{Start: time.Unix(30, 0).UTC(), End: time.Unix(70, 0).UTC()},
}},
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "d",
MaxTimeRange: physical.TimeRange{Start: time.Unix(40, 0).UTC(), End: time.Unix(80, 0).UTC()},
}},
},
})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vecAgg, Child: parallelize})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: rangeAgg})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scanSet})
physicalPlan := physical.FromGraph(physicalGraph)
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
require.NoError(t, err)
for _, tc := range []struct {
name string
batchSize int
expectedFetchCalls int
}{
{name: "unlimited (batchSize=0)", batchSize: 0, expectedFetchCalls: 1},
{name: "batchSize=1", batchSize: 1, expectedFetchCalls: 4},
{name: "batchSize=2", batchSize: 2, expectedFetchCalls: 2},
} {
t.Run(tc.name, func(t *testing.T) {
mc := &countingCache{workflowMockCache: newWorkflowMockCache()}
_, err := planWorkflow("", physicalPlan, cacheParams{
enabled: true,
taskCacheMaxSizeBytes: 1 << 20,
pruneEmptyCachedTasks: true,
eliminationBatchSize: tc.batchSize,
registry: executor.NewTestTaskCacheRegistry(map[physical.TaskCacheName]cache.Cache{
physical.TaskCacheLogsScanRangeAggr: mc,
}),
}, log.NewNopLogger())
require.NoError(t, err)
require.Equal(t, tc.expectedFetchCalls, mc.fetchCalls)
})
}
}
// emptyResultPayload returns a zero-length buffer which is treated as an empty cached result.
func emptyResultPayload() []byte { return []byte{} }

Loading…
Cancel
Save