|
|
|
|
@ -168,13 +168,13 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma |
|
|
|
|
b.currSegmentStorageUpdates.reset(segment.TableName, segment.UserID) |
|
|
|
|
|
|
|
|
|
// Process each chunks group (same deletion query)
|
|
|
|
|
for i, group := range segment.ChunksGroups { |
|
|
|
|
for _, group := range segment.ChunksGroups { |
|
|
|
|
// Check if we should stop processing this manifest
|
|
|
|
|
if ctx.Err() != nil { |
|
|
|
|
return ctx.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := b.createJobsForChunksGroup(ctx, segment.TableName, segment.UserID, fmt.Sprintf("%d", i), group, jobsChan); err != nil { |
|
|
|
|
if err := b.createJobsForChunksGroup(ctx, segment.TableName, segment.UserID, group, jobsChan); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -282,7 +282,7 @@ func (b *JobBuilder) readManifest(ctx context.Context, manifestPath string) (*ma |
|
|
|
|
return &m, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *JobBuilder) createJobsForChunksGroup(ctx context.Context, tableName, userID, groupID string, group ChunksGroup, jobsChan chan<- *grpc.Job) error { |
|
|
|
|
func (b *JobBuilder) createJobsForChunksGroup(ctx context.Context, tableName, userID string, group ChunksGroup, jobsChan chan<- *grpc.Job) error { |
|
|
|
|
for labels, chunks := range group.Chunks { |
|
|
|
|
// Split chunks into groups of maxChunksPerJob
|
|
|
|
|
for i := 0; i < len(chunks); i += maxChunksPerJob { |
|
|
|
|
|