feat(engine/workflow): Add dormant taskTypeCompaction admission lane (#21813)

pull/21730/head^2
Trevor Whitney 4 weeks ago committed by GitHub
parent 49a9dae084
commit 4e9a034ef8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 31
      pkg/engine/internal/workflow/admission_control.go
  2. 22
      pkg/engine/internal/workflow/admission_control_test.go
  3. 18
      pkg/engine/internal/workflow/workflow.go
  4. 5
      pkg/engine/internal/workflow/workflow_test.go

@ -13,6 +13,14 @@ type taskType string
const (
taskTypeScan taskType = "scan"
taskTypeOther taskType = "other"
// taskTypeCompaction is reserved for tasks whose fragments contain
// dataobj-compaction nodes. The lane is wired through admissionControl
// and dispatchTasks but currently dormant: typeFor does not yet
// classify any task as taskTypeCompaction. The lane activates once
// dataobj-compaction physical-plan node types are introduced and
// typeFor is extended to recognize them.
taskTypeCompaction taskType = "compaction"
)
type admissionLane struct {
@ -35,27 +43,32 @@ type admissionControl struct {
mapping map[taskType]*admissionLane
}
func newAdmissionControl(maxScanTasks, maxOtherTasks int64) *admissionControl {
func newAdmissionControl(maxScanTasks, maxOtherTasks, maxCompactionTasks int64) *admissionControl {
if maxScanTasks < 1 {
maxScanTasks = math.MaxInt64
}
if maxOtherTasks < 1 {
maxOtherTasks = math.MaxInt64
}
if maxCompactionTasks < 1 {
maxCompactionTasks = math.MaxInt64
}
return &admissionControl{
mapping: map[taskType]*admissionLane{
taskTypeScan: newAdmissionLane(taskTypeScan, maxScanTasks),
taskTypeOther: newAdmissionLane(taskTypeOther, maxOtherTasks),
taskTypeScan: newAdmissionLane(taskTypeScan, maxScanTasks),
taskTypeOther: newAdmissionLane(taskTypeOther, maxOtherTasks),
taskTypeCompaction: newAdmissionLane(taskTypeCompaction, maxCompactionTasks),
},
}
}
// groupByBucket categorizes a slice of tasks into groups based on their characteristics (scan, other, ...).
// groupByType categorizes a slice of tasks into groups based on their characteristics (scan, other, compaction).
func (ac *admissionControl) groupByType(tasks []*Task) map[taskType][]*Task {
groups := map[taskType][]*Task{
taskTypeScan: make([]*Task, 0, len(tasks)),
taskTypeOther: make([]*Task, 0, len(tasks)),
taskTypeScan: make([]*Task, 0, len(tasks)),
taskTypeOther: make([]*Task, 0, len(tasks)),
taskTypeCompaction: make([]*Task, 0, len(tasks)),
}
for _, t := range tasks {
@ -66,6 +79,12 @@ func (ac *admissionControl) groupByType(tasks []*Task) map[taskType][]*Task {
return groups
}
// typeFor classifies a task by inspecting its fragment.
//
// The taskTypeCompaction return is intentionally absent: it will be
// added once dataobj-compaction physical-plan node types are introduced
// and this function is extended to recognize them. Until then the third
// lane is dormant.
func (ac *admissionControl) typeFor(task *Task) taskType {
if isScanTask(task) {
return taskTypeScan

@ -12,7 +12,7 @@ import (
)
func TestAdmissionControl_getBucket(t *testing.T) {
ac := newAdmissionControl(32, math.MaxInt64)
ac := newAdmissionControl(32, math.MaxInt64, math.MaxInt64)
t.Run("Task without a DataObjScan node is considered an 'other' task", func(t *testing.T) {
fragment := dag.Graph[physical.Node]{}
@ -48,3 +48,23 @@ func TestAdmissionControl_getBucket(t *testing.T) {
require.Equal(t, taskTypeScan, ty)
})
}
// TestAdmissionControl_CompactionLaneWired verifies the dormant
// taskTypeCompaction lane is allocated with the requested capacity, is
// reachable via get(), and appears in the groupByType map even when no tasks
// are provided. The underlying semaphore is library code and is not
// re-tested here.
func TestAdmissionControl_CompactionLaneWired(t *testing.T) {
const compactionCap int64 = 5
ac := newAdmissionControl(8, 8, compactionCap)
lane := ac.get(taskTypeCompaction)
require.NotNil(t, lane)
require.Equal(t, compactionCap, lane.capacity)
groups := ac.groupByType(nil)
require.Contains(t, groups, taskTypeScan)
require.Contains(t, groups, taskTypeOther)
require.Contains(t, groups, taskTypeCompaction)
require.Empty(t, groups[taskTypeCompaction])
}

@ -58,6 +58,16 @@ type Options struct {
// may run concurrently within a single workflow. 0 means no limit.
MaxRunningOtherTasks int
// MaxRunningCompactionTasks specifies the maximum number of compaction
// tasks that may run concurrently within a single workflow. 0 means no
// limit.
//
// The lane is dormant in query workflows (typeFor never classifies a
// query task as compaction); compactor workflows opt in by populating
// this field once the dataobj-compaction physical-plan node types and
// the corresponding typeFor classification land.
MaxRunningCompactionTasks int
// DebugTasks toggles debug messages for a task. This is very verbose and
// should only be enabled for debugging purposes.
//
@ -320,6 +330,7 @@ func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error {
wf.admissionControl = newAdmissionControl(
int64(wf.opts.MaxRunningScanTasks),
int64(wf.opts.MaxRunningOtherTasks),
int64(wf.opts.MaxRunningCompactionTasks),
)
// this span captures the time spent waiting for all tasks to be admitted
@ -335,9 +346,16 @@ func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error {
region.Record(xcap.StatTaskCount.Observe(int64(len(tasks))))
groups := wf.admissionControl.groupByType(tasks)
// taskTypeCompaction is appended last because the loop is sequential
// (each lane is fully drained before the next): a populated Compaction
// lane should never delay Scan dispatch. The lane is currently dormant
// — typeFor does not classify any task as compaction — so this slot is
// always empty for query workflows. It will be populated once the
// dataobj-compaction node types and typeFor classification land.
for _, taskType := range []taskType{
taskTypeOther,
taskTypeScan,
taskTypeCompaction,
} {
lane := wf.admissionControl.get(taskType)
tasks := groups[taskType]

@ -299,8 +299,9 @@ func TestAdmissionControl(t *testing.T) {
opts := Options{
Tenant: "tenant",
MaxRunningScanTasks: 32, // less than numScanTasks
MaxRunningOtherTasks: 0, // unlimited
MaxRunningScanTasks: 32, // less than numScanTasks
MaxRunningOtherTasks: 0, // unlimited
MaxRunningCompactionTasks: 0, // unlimited; lane is dormant
}
wf, err := New(opts, log.NewNopLogger(), fr, physicalPlan)
require.NoError(t, err, "workflow should construct properly")

Loading…
Cancel
Save