diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 81a1c76857..6f3b8de15e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4736,6 +4736,15 @@ otlp_config: # and all tasks will be scheduled at once. # CLI flag: -limits.max-scan-task-parallelism [max_scan_task_parallelism: | default = 0] + +# Experimental: Toggles verbose debug logging of tasks in the new query engine. +# CLI flag: -limits.debug-engine-tasks +[debug_engine_tasks: | default = false] + +# Experimental: Toggles verbose debug logging of data streams in the new query +# engine. +# CLI flag: -limits.debug-engine-streams +[debug_engine_streams: | default = false] ``` ### local_storage_config diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index e0bc215f51..25d0e19598 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -328,6 +328,9 @@ func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalP opts := workflow.Options{ MaxRunningScanTasks: e.limits.MaxScanTaskParallelism(tenantID), MaxRunningOtherTasks: 0, + + DebugTasks: e.limits.DebugEngineTasks(tenantID), + DebugStreams: e.limits.DebugEngineStreams(tenantID), } wf, err := workflow.New(opts, logger, tenantID, e.scheduler.inner, physicalPlan) if err != nil { diff --git a/pkg/engine/internal/workflow/workflow.go b/pkg/engine/internal/workflow/workflow.go index 97d3439d5a..f4911b08af 100644 --- a/pkg/engine/internal/workflow/workflow.go +++ b/pkg/engine/internal/workflow/workflow.go @@ -18,9 +18,26 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) +// Options configures a [Workflow]. type Options struct { - MaxRunningScanTasks int + // MaxRunningScanTasks specifies the maximum number of scan tasks that may + // run concurrently within a single workflow. 0 means no limit. + MaxRunningScanTasks int + + // MaxRunningOtherTasks specifies the maximum number of non-scan tasks that + // may run concurrently within a single workflow. 0 means no limit. MaxRunningOtherTasks int + + // DebugTasks toggles debug messages for a task. This is very verbose and + // should only be enabled for debugging purposes. + // + // Regardless of the value of DebugTasks, workers still log when + // they start and finish assigned tasks. + DebugTasks bool + + // DebugStreams toggles debug messages for data streams. This is very + // verbose and should only be enabled for debugging purposes. + DebugStreams bool } // Workflow represents a physical plan that has been partitioned into @@ -253,7 +270,9 @@ func (wf *Workflow) onStreamChange(_ context.Context, stream *Stream, newState S } func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newStatus TaskStatus) { - level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State) + if wf.opts.DebugTasks { + level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State) + } wf.tasksMut.Lock() oldState := wf.taskStates[task] diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 9a6ec0ca28..86f5aba4ee 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -258,6 +258,14 @@ func (l *limiter) MaxScanTaskParallelism(_ string) int { return 0 // This setting for the v2 execution engine is unused in LogCLI } +func (l *limiter) DebugEngineTasks(_ string) bool { + return false // This setting for the v2 execution engine is unused in LogCLI +} + +func (l *limiter) DebugEngineStreams(_ string) bool { + return false // This setting for the v2 execution engine is unused in LogCLI +} + type querier struct { r io.Reader labels labels.Labels diff --git a/pkg/logql/limits.go b/pkg/logql/limits.go index af34574b7b..838a88d19a 100644 --- a/pkg/logql/limits.go +++ b/pkg/logql/limits.go @@ -25,6 +25,8 @@ type Limits interface { // v2 engine limits MaxScanTaskParallelism(string) int + DebugEngineTasks(string) bool + DebugEngineStreams(string) bool } type fakeLimits struct { @@ -34,8 +36,12 @@ type fakeLimits struct { rangeLimit time.Duration requiredLabels []string multiVariantQueryEnable bool + // v2 engine limits + maxScanTaskParallelism int + debugEngineTasks bool + debugEngineStreams bool } func (f fakeLimits) MaxQuerySeries(_ context.Context, _ string) int { @@ -65,3 +71,11 @@ func (f fakeLimits) EnableMultiVariantQueries(_ string) bool { func (f fakeLimits) MaxScanTaskParallelism(_ string) int { return f.maxScanTaskParallelism } + +func (f fakeLimits) DebugEngineTasks(_ string) bool { + return f.debugEngineTasks +} + +func (f fakeLimits) DebugEngineStreams(_ string) bool { + return f.debugEngineStreams +} diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index ffc46ac47f..59741b29f7 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1541,6 +1541,14 @@ func (f fakeLimits) MaxScanTaskParallelism(_ string) int { return 0 } +func (f fakeLimits) DebugEngineTasks(_ string) bool { + return false +} + +func (f fakeLimits) DebugEngineStreams(_ string) bool { + return false +} + type ingesterQueryOpts struct { queryStoreOnly bool queryIngestersWithin time.Duration diff --git a/pkg/querier/testutil/limits.go b/pkg/querier/testutil/limits.go index 2a25792154..04dcddffa4 100644 --- a/pkg/querier/testutil/limits.go +++ b/pkg/querier/testutil/limits.go @@ -91,3 +91,11 @@ func (m *MockLimits) PersistenceGranularity(_ string) time.Duration { func (m *MockLimits) MaxScanTaskParallelism(_ string) int { return 0 } + +func (m *MockLimits) DebugEngineStreams(_ string) bool { + return false +} + +func (m *MockLimits) DebugEngineTasks(_ string) bool { + return false +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 31882ab66c..263a86cdff 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -285,7 +285,10 @@ type Limits struct { S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."` // Per tenant limits for the v2 execution engine - MaxScanTaskParallelism int `yaml:"max_scan_task_parallelism" json:"max_scan_task_parallelism"` + + MaxScanTaskParallelism int `yaml:"max_scan_task_parallelism" json:"max_scan_task_parallelism"` + DebugEngineTasks bool `yaml:"debug_engine_tasks" json:"debug_engine_tasks"` + DebugEngineStreams bool `yaml:"debug_engine_streams" json:"debug_engine_streams"` } type FieldDetectorConfig struct { @@ -528,6 +531,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ) f.IntVar(&l.MaxScanTaskParallelism, "limits.max-scan-task-parallelism", 0, "Experimental: Controls the amount of scan tasks that can be running in parallel in the new query engine. The default of 0 means unlimited parallelism and all tasks will be scheduled at once.") + f.BoolVar(&l.DebugEngineTasks, "limits.debug-engine-tasks", false, "Experimental: Toggles verbose debug logging of tasks in the new query engine.") + f.BoolVar(&l.DebugEngineStreams, "limits.debug-engine-streams", false, "Experimental: Toggles verbose debug logging of data streams in the new query engine.") } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1368,6 +1373,14 @@ func (o *Overrides) MaxScanTaskParallelism(userID string) int { return o.getOverridesForUser(userID).MaxScanTaskParallelism } +func (o *Overrides) DebugEngineTasks(userID string) bool { + return o.getOverridesForUser(userID).DebugEngineTasks +} + +func (o *Overrides) DebugEngineStreams(userID string) bool { + return o.getOverridesForUser(userID).DebugEngineStreams +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID)