|
|
|
@ -122,6 +122,8 @@ func (p *Planner) stopping(_ error) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Planner) running(ctx context.Context) error { |
|
|
|
|
go p.trackInflightRequests(ctx) |
|
|
|
|
|
|
|
|
|
// run once at beginning
|
|
|
|
|
if err := p.runOne(ctx); err != nil { |
|
|
|
|
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err) |
|
|
|
@ -130,9 +132,6 @@ func (p *Planner) running(ctx context.Context) error { |
|
|
|
|
planningTicker := time.NewTicker(p.cfg.PlanningInterval) |
|
|
|
|
defer planningTicker.Stop() |
|
|
|
|
|
|
|
|
|
inflightTasksTicker := time.NewTicker(250 * time.Millisecond) |
|
|
|
|
defer inflightTasksTicker.Stop() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
@ -149,6 +148,19 @@ func (p *Planner) running(ctx context.Context) error { |
|
|
|
|
if err := p.runOne(ctx); err != nil { |
|
|
|
|
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *Planner) trackInflightRequests(ctx context.Context) { |
|
|
|
|
inflightTasksTicker := time.NewTicker(250 * time.Millisecond) |
|
|
|
|
defer inflightTasksTicker.Stop() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
// We just return. Error handling and logging is done in the main loop (running method).
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
case <-inflightTasksTicker.C: |
|
|
|
|
inflight := p.totalPendingTasks() |
|
|
|
@ -223,6 +235,7 @@ func (p *Planner) runOne(ctx context.Context) error { |
|
|
|
|
tenantTableEnqueuedTasks++ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p.metrics.tenantTasksPlanned.WithLabelValues(tt.tenant).Add(float64(tenantTableEnqueuedTasks)) |
|
|
|
|
tasksResultForTenantTable[tt] = tenantTableTaskResults{ |
|
|
|
|
tasksToWait: tenantTableEnqueuedTasks, |
|
|
|
|
originalMetas: existingMetas, |
|
|
|
@ -489,6 +502,12 @@ func (p *Planner) loadTenantWork( |
|
|
|
|
|
|
|
|
|
tenantTableWork[table][tenant] = bounds |
|
|
|
|
|
|
|
|
|
// Reset progress tracking metrics for this tenant
|
|
|
|
|
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
|
|
|
|
|
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
|
|
|
|
|
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0) |
|
|
|
|
p.metrics.tenantTasksCompleted.WithLabelValues(tenant).Set(0) |
|
|
|
|
|
|
|
|
|
level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor) |
|
|
|
|
} |
|
|
|
|
if err := tenants.Err(); err != nil { |
|
|
|
@ -804,6 +823,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer |
|
|
|
|
"retries", task.timesEnqueued.Load(), |
|
|
|
|
) |
|
|
|
|
p.removePendingTask(task) |
|
|
|
|
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc() |
|
|
|
|
|
|
|
|
|
// Send the result back to the task. The channel is buffered, so this should not block.
|
|
|
|
|
task.resultsChannel <- result |
|
|
|
|