|
|
|
@ -5,7 +5,6 @@ import ( |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
"github.com/go-kit/log/level" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/dskit/concurrency" |
|
|
|
@ -37,7 +36,6 @@ type processor struct { |
|
|
|
|
|
|
|
|
|
func (p *processor) processTasks(ctx context.Context, tasks []Task) error { |
|
|
|
|
tenant := tasks[0].tenant |
|
|
|
|
level.Info(p.logger).Log("msg", "process tasks", "tenant", tenant, "tasks", len(tasks)) |
|
|
|
|
|
|
|
|
|
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) { |
|
|
|
|
err := p.processTasksForDay(ctx, tenant, ts, tasks) |
|
|
|
@ -55,7 +53,6 @@ func (p *processor) processTasks(ctx context.Context, tasks []Task) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *processor) processTasksForDay(ctx context.Context, tenant string, day config.DayTime, tasks []Task) error { |
|
|
|
|
level.Info(p.logger).Log("msg", "process tasks for day", "tenant", tenant, "tasks", len(tasks), "day", day.String()) |
|
|
|
|
var duration time.Duration |
|
|
|
|
|
|
|
|
|
blocksRefs := make([]bloomshipper.BlockRef, 0, len(tasks[0].blocks)*len(tasks)) |
|
|
|
@ -83,7 +80,6 @@ func (p *processor) processTasksForDay(ctx context.Context, tenant string, day c |
|
|
|
|
bloomshipper.WithPool(p.store.Allocator()), |
|
|
|
|
) |
|
|
|
|
duration = time.Since(startBlocks) |
|
|
|
|
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err) |
|
|
|
|
|
|
|
|
|
for _, t := range tasks { |
|
|
|
|
FromContext(t.ctx).AddBlocksFetchTime(duration) |
|
|
|
|