chore(engine): make stream/task debug logging configurable (#19842)

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
pull/19793/head
Robert Fratto 2 months ago committed by GitHub
parent 53230deee4
commit 0f0846adec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      docs/sources/shared/configuration.md
  2. 3
      pkg/engine/engine.go
  3. 23
      pkg/engine/internal/workflow/workflow.go
  4. 8
      pkg/logcli/client/file.go
  5. 14
      pkg/logql/limits.go
  6. 8
      pkg/querier/queryrange/roundtrip_test.go
  7. 8
      pkg/querier/testutil/limits.go
  8. 15
      pkg/validation/limits.go

@ -4736,6 +4736,15 @@ otlp_config:
# and all tasks will be scheduled at once.
# CLI flag: -limits.max-scan-task-parallelism
[max_scan_task_parallelism: <int> | default = 0]
# Experimental: Toggles verbose debug logging of tasks in the new query engine.
# CLI flag: -limits.debug-engine-tasks
[debug_engine_tasks: <boolean> | default = false]
# Experimental: Toggles verbose debug logging of data streams in the new query
# engine.
# CLI flag: -limits.debug-engine-streams
[debug_engine_streams: <boolean> | default = false]
```
### local_storage_config

@ -328,6 +328,9 @@ func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalP
opts := workflow.Options{
MaxRunningScanTasks: e.limits.MaxScanTaskParallelism(tenantID),
MaxRunningOtherTasks: 0,
DebugTasks: e.limits.DebugEngineTasks(tenantID),
DebugStreams: e.limits.DebugEngineStreams(tenantID),
}
wf, err := workflow.New(opts, logger, tenantID, e.scheduler.inner, physicalPlan)
if err != nil {

@ -18,9 +18,26 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
)
// Options configures a [Workflow].
type Options struct {
MaxRunningScanTasks int
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
// run concurrently within a single workflow. 0 means no limit.
MaxRunningScanTasks int
// MaxRunningOtherTasks specifies the maximum number of non-scan tasks that
// may run concurrently within a single workflow. 0 means no limit.
MaxRunningOtherTasks int
// DebugTasks toggles debug messages for a task. This is very verbose and
// should only be enabled for debugging purposes.
//
// Regardless of the value of DebugTasks, workers still log when
// they start and finish assigned tasks.
DebugTasks bool
// DebugStreams toggles debug messages for data streams. This is very
// verbose and should only be enabled for debugging purposes.
DebugStreams bool
}
// Workflow represents a physical plan that has been partitioned into
@ -253,7 +270,9 @@ func (wf *Workflow) onStreamChange(_ context.Context, stream *Stream, newState S
}
func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newStatus TaskStatus) {
level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State)
if wf.opts.DebugTasks {
level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State)
}
wf.tasksMut.Lock()
oldState := wf.taskStates[task]

@ -258,6 +258,14 @@ func (l *limiter) MaxScanTaskParallelism(_ string) int {
return 0 // This setting for the v2 execution engine is unused in LogCLI
}
func (l *limiter) DebugEngineTasks(_ string) bool {
return false // This setting for the v2 execution engine is unused in LogCLI
}
func (l *limiter) DebugEngineStreams(_ string) bool {
return false // This setting for the v2 execution engine is unused in LogCLI
}
type querier struct {
r io.Reader
labels labels.Labels

@ -25,6 +25,8 @@ type Limits interface {
// v2 engine limits
MaxScanTaskParallelism(string) int
DebugEngineTasks(string) bool
DebugEngineStreams(string) bool
}
type fakeLimits struct {
@ -34,8 +36,12 @@ type fakeLimits struct {
rangeLimit time.Duration
requiredLabels []string
multiVariantQueryEnable bool
// v2 engine limits
maxScanTaskParallelism int
debugEngineTasks bool
debugEngineStreams bool
}
func (f fakeLimits) MaxQuerySeries(_ context.Context, _ string) int {
@ -65,3 +71,11 @@ func (f fakeLimits) EnableMultiVariantQueries(_ string) bool {
func (f fakeLimits) MaxScanTaskParallelism(_ string) int {
return f.maxScanTaskParallelism
}
func (f fakeLimits) DebugEngineTasks(_ string) bool {
return f.debugEngineTasks
}
func (f fakeLimits) DebugEngineStreams(_ string) bool {
return f.debugEngineStreams
}

@ -1541,6 +1541,14 @@ func (f fakeLimits) MaxScanTaskParallelism(_ string) int {
return 0
}
func (f fakeLimits) DebugEngineTasks(_ string) bool {
return false
}
func (f fakeLimits) DebugEngineStreams(_ string) bool {
return false
}
type ingesterQueryOpts struct {
queryStoreOnly bool
queryIngestersWithin time.Duration

@ -91,3 +91,11 @@ func (m *MockLimits) PersistenceGranularity(_ string) time.Duration {
func (m *MockLimits) MaxScanTaskParallelism(_ string) int {
return 0
}
func (m *MockLimits) DebugEngineStreams(_ string) bool {
return false
}
func (m *MockLimits) DebugEngineTasks(_ string) bool {
return false
}

@ -285,7 +285,10 @@ type Limits struct {
S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."`
// Per tenant limits for the v2 execution engine
MaxScanTaskParallelism int `yaml:"max_scan_task_parallelism" json:"max_scan_task_parallelism"`
MaxScanTaskParallelism int `yaml:"max_scan_task_parallelism" json:"max_scan_task_parallelism"`
DebugEngineTasks bool `yaml:"debug_engine_tasks" json:"debug_engine_tasks"`
DebugEngineStreams bool `yaml:"debug_engine_streams" json:"debug_engine_streams"`
}
type FieldDetectorConfig struct {
@ -528,6 +531,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
)
f.IntVar(&l.MaxScanTaskParallelism, "limits.max-scan-task-parallelism", 0, "Experimental: Controls the amount of scan tasks that can be running in parallel in the new query engine. The default of 0 means unlimited parallelism and all tasks will be scheduled at once.")
f.BoolVar(&l.DebugEngineTasks, "limits.debug-engine-tasks", false, "Experimental: Toggles verbose debug logging of tasks in the new query engine.")
f.BoolVar(&l.DebugEngineStreams, "limits.debug-engine-streams", false, "Experimental: Toggles verbose debug logging of data streams in the new query engine.")
}
// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
@ -1368,6 +1373,14 @@ func (o *Overrides) MaxScanTaskParallelism(userID string) int {
return o.getOverridesForUser(userID).MaxScanTaskParallelism
}
func (o *Overrides) DebugEngineTasks(userID string) bool {
return o.getOverridesForUser(userID).DebugEngineTasks
}
func (o *Overrides) DebugEngineStreams(userID string) bool {
return o.getOverridesForUser(userID).DebugEngineStreams
}
func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)

Loading…
Cancel
Save