From 4e9a034ef8f76df1807fac1fdbe2b3903ddc7f67 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Mon, 11 May 2026 14:29:05 -0600 Subject: [PATCH] feat(engine/workflow): Add dormant taskTypeCompaction admission lane (#21813) --- .../internal/workflow/admission_control.go | 31 +++++++++++++++---- .../workflow/admission_control_test.go | 22 ++++++++++++- pkg/engine/internal/workflow/workflow.go | 18 +++++++++++ pkg/engine/internal/workflow/workflow_test.go | 5 +-- 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/pkg/engine/internal/workflow/admission_control.go b/pkg/engine/internal/workflow/admission_control.go index a47207aabd..08c31a2f3d 100644 --- a/pkg/engine/internal/workflow/admission_control.go +++ b/pkg/engine/internal/workflow/admission_control.go @@ -13,6 +13,14 @@ type taskType string const ( taskTypeScan taskType = "scan" taskTypeOther taskType = "other" + + // taskTypeCompaction is reserved for tasks whose fragments contain + // dataobj-compaction nodes. The lane is wired through admissionControl + // and dispatchTasks but currently dormant: typeFor does not yet + // classify any task as taskTypeCompaction. The lane activates once + // dataobj-compaction physical-plan node types are introduced and + // typeFor is extended to recognize them. + taskTypeCompaction taskType = "compaction" ) type admissionLane struct { @@ -35,27 +43,32 @@ type admissionControl struct { mapping map[taskType]*admissionLane } -func newAdmissionControl(maxScanTasks, maxOtherTasks int64) *admissionControl { +func newAdmissionControl(maxScanTasks, maxOtherTasks, maxCompactionTasks int64) *admissionControl { if maxScanTasks < 1 { maxScanTasks = math.MaxInt64 } if maxOtherTasks < 1 { maxOtherTasks = math.MaxInt64 } + if maxCompactionTasks < 1 { + maxCompactionTasks = math.MaxInt64 + } return &admissionControl{ mapping: map[taskType]*admissionLane{ - taskTypeScan: newAdmissionLane(taskTypeScan, maxScanTasks), - taskTypeOther: newAdmissionLane(taskTypeOther, maxOtherTasks), + taskTypeScan: newAdmissionLane(taskTypeScan, maxScanTasks), + taskTypeOther: newAdmissionLane(taskTypeOther, maxOtherTasks), + taskTypeCompaction: newAdmissionLane(taskTypeCompaction, maxCompactionTasks), }, } } -// groupByBucket categorizes a slice of tasks into groups based on their characteristics (scan, other, ...). +// groupByType categorizes a slice of tasks into groups based on their characteristics (scan, other, compaction). func (ac *admissionControl) groupByType(tasks []*Task) map[taskType][]*Task { groups := map[taskType][]*Task{ - taskTypeScan: make([]*Task, 0, len(tasks)), - taskTypeOther: make([]*Task, 0, len(tasks)), + taskTypeScan: make([]*Task, 0, len(tasks)), + taskTypeOther: make([]*Task, 0, len(tasks)), + taskTypeCompaction: make([]*Task, 0, len(tasks)), } for _, t := range tasks { @@ -66,6 +79,12 @@ func (ac *admissionControl) groupByType(tasks []*Task) map[taskType][]*Task { return groups } +// typeFor classifies a task by inspecting its fragment. +// +// The taskTypeCompaction return is intentionally absent: it will be +// added once dataobj-compaction physical-plan node types are introduced +// and this function is extended to recognize them. Until then the third +// lane is dormant. func (ac *admissionControl) typeFor(task *Task) taskType { if isScanTask(task) { return taskTypeScan diff --git a/pkg/engine/internal/workflow/admission_control_test.go b/pkg/engine/internal/workflow/admission_control_test.go index 631e002c6f..6c8d47b4ae 100644 --- a/pkg/engine/internal/workflow/admission_control_test.go +++ b/pkg/engine/internal/workflow/admission_control_test.go @@ -12,7 +12,7 @@ import ( ) func TestAdmissionControl_getBucket(t *testing.T) { - ac := newAdmissionControl(32, math.MaxInt64) + ac := newAdmissionControl(32, math.MaxInt64, math.MaxInt64) t.Run("Task without a DataObjScan node is considered an 'other' task", func(t *testing.T) { fragment := dag.Graph[physical.Node]{} @@ -48,3 +48,23 @@ func TestAdmissionControl_getBucket(t *testing.T) { require.Equal(t, taskTypeScan, ty) }) } + +// TestAdmissionControl_CompactionLaneWired verifies the dormant +// taskTypeCompaction lane is allocated with the requested capacity, is +// reachable via get(), and appears in the groupByType map even when no tasks +// are provided. The underlying semaphore is library code and is not +// re-tested here. +func TestAdmissionControl_CompactionLaneWired(t *testing.T) { + const compactionCap int64 = 5 + ac := newAdmissionControl(8, 8, compactionCap) + + lane := ac.get(taskTypeCompaction) + require.NotNil(t, lane) + require.Equal(t, compactionCap, lane.capacity) + + groups := ac.groupByType(nil) + require.Contains(t, groups, taskTypeScan) + require.Contains(t, groups, taskTypeOther) + require.Contains(t, groups, taskTypeCompaction) + require.Empty(t, groups[taskTypeCompaction]) +} diff --git a/pkg/engine/internal/workflow/workflow.go b/pkg/engine/internal/workflow/workflow.go index 36b4b3f426..b0607e5845 100644 --- a/pkg/engine/internal/workflow/workflow.go +++ b/pkg/engine/internal/workflow/workflow.go @@ -58,6 +58,16 @@ type Options struct { // may run concurrently within a single workflow. 0 means no limit. MaxRunningOtherTasks int + // MaxRunningCompactionTasks specifies the maximum number of compaction + // tasks that may run concurrently within a single workflow. 0 means no + // limit. + // + // The lane is dormant in query workflows (typeFor never classifies a + // query task as compaction); compactor workflows opt in by populating + // this field once the dataobj-compaction physical-plan node types and + // the corresponding typeFor classification land. + MaxRunningCompactionTasks int + // DebugTasks toggles debug messages for a task. This is very verbose and // should only be enabled for debugging purposes. // @@ -320,6 +330,7 @@ func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error { wf.admissionControl = newAdmissionControl( int64(wf.opts.MaxRunningScanTasks), int64(wf.opts.MaxRunningOtherTasks), + int64(wf.opts.MaxRunningCompactionTasks), ) // this span captures the time spent waiting for all tasks to be admitted @@ -335,9 +346,16 @@ func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error { region.Record(xcap.StatTaskCount.Observe(int64(len(tasks)))) groups := wf.admissionControl.groupByType(tasks) + // taskTypeCompaction is appended last because the loop is sequential + // (each lane is fully drained before the next): a populated Compaction + // lane should never delay Scan dispatch. The lane is currently dormant + // — typeFor does not classify any task as compaction — so this slot is + // always empty for query workflows. It will be populated once the + // dataobj-compaction node types and typeFor classification land. for _, taskType := range []taskType{ taskTypeOther, taskTypeScan, + taskTypeCompaction, } { lane := wf.admissionControl.get(taskType) tasks := groups[taskType] diff --git a/pkg/engine/internal/workflow/workflow_test.go b/pkg/engine/internal/workflow/workflow_test.go index aaca74f3fb..1444438ce0 100644 --- a/pkg/engine/internal/workflow/workflow_test.go +++ b/pkg/engine/internal/workflow/workflow_test.go @@ -299,8 +299,9 @@ func TestAdmissionControl(t *testing.T) { opts := Options{ Tenant: "tenant", - MaxRunningScanTasks: 32, // less than numScanTasks - MaxRunningOtherTasks: 0, // unlimited + MaxRunningScanTasks: 32, // less than numScanTasks + MaxRunningOtherTasks: 0, // unlimited + MaxRunningCompactionTasks: 0, // unlimited; lane is dormant } wf, err := New(opts, log.NewNopLogger(), fr, physicalPlan) require.NoError(t, err, "workflow should construct properly")