feat(compactor HS): store index updates per processed segment in the manifest to the object storage (#18212)

pull/18230/head
Sandeep Sukhani 10 months ago committed by GitHub
parent c167800b3f
commit 8007e22db5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      pkg/compactor/deletion/deletion_manifest_builder.go
  2. 2
      pkg/compactor/deletion/deletion_manifest_builder_test.go
  3. 82
      pkg/compactor/deletion/job_builder.go
  4. 105
      pkg/compactor/deletion/job_builder_test.go
  5. 12
      pkg/compactor/jobqueue/queue.go
  6. 4
      pkg/compactor/jobqueue/queue_test.go

@ -229,9 +229,13 @@ func (d *deletionManifestBuilder) flushCurrentBatch(ctx context.Context) error {
d.segmentsCount++
d.overallChunksCount += d.currentSegmentChunksCount
d.currentSegmentChunksCount = 0
return d.deleteStoreClient.PutObject(ctx, d.buildObjectKey(fmt.Sprintf("%d.json", d.segmentsCount)), strings.NewReader(unsafeGetString(batchJSON)))
return d.deleteStoreClient.PutObject(ctx, d.buildObjectKey(fmt.Sprintf("%d.json", d.segmentsCount-1)), strings.NewReader(unsafeGetString(batchJSON)))
}
func (d *deletionManifestBuilder) buildObjectKey(filename string) string {
return path.Join(fmt.Sprint(d.creationTime.UnixNano()), filename)
}
func (d *deletionManifestBuilder) path() string {
return fmt.Sprint(d.creationTime.UnixNano())
}

@ -586,7 +586,7 @@ func TestDeletionManifestBuilder(t *testing.T) {
require.Equal(t, tc.expectedManifest, manifest)
for i := 0; i < tc.expectedManifest.SegmentsCount; i++ {
reader, _, err := builder.deleteStoreClient.GetObject(context.Background(), builder.buildObjectKey(fmt.Sprintf("%d.json", i+1)))
reader, _, err := builder.deleteStoreClient.GetObject(context.Background(), builder.buildObjectKey(fmt.Sprintf("%d.json", i)))
require.NoError(t, err)
segmentJSON, err := io.ReadAll(reader)

@ -1,6 +1,7 @@
package deletion
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -10,13 +11,17 @@ import (
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
const maxChunksPerJob = 1000
const (
maxChunksPerJob = 1000
indexUpdatesFilenameSuffix = `-index-updates.json`
)
type deletionJob struct {
TableName string `json:"table_name"`
@ -37,11 +42,14 @@ type JobBuilder struct {
// Current manifest being processed
currentManifest manifestJobs
currentManifestMtx sync.RWMutex
currSegmentIndexUpdates *indexUpdates
}
func NewJobBuilder(deleteStoreClient client.ObjectClient) *JobBuilder {
return &JobBuilder{
deleteStoreClient: deleteStoreClient,
deleteStoreClient: deleteStoreClient,
currSegmentIndexUpdates: &indexUpdates{},
}
}
@ -102,7 +110,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, j
b.currentManifestMtx.Unlock()
// Process segments sequentially
for segmentNum := 1; ctx.Err() == nil && segmentNum <= manifest.SegmentsCount; segmentNum++ {
for segmentNum := 0; ctx.Err() == nil && segmentNum < manifest.SegmentsCount; segmentNum++ {
level.Info(util_log.Logger).Log("msg", "starting segment processing",
"manifest", manifestPath,
"segment", segmentNum)
@ -128,6 +136,8 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, j
b.currentManifest.jobsInProgress = make(map[string]struct{})
b.currentManifestMtx.Unlock()
b.currSegmentIndexUpdates.reset(segment.TableName)
// Process each chunks group (same deletion query)
for i, group := range segment.ChunksGroups {
// Check if we should stop processing this manifest
@ -145,6 +155,11 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, j
return err
}
// update the index updates for the current table
if err := b.uploadIndexUpdateForCurrentSegment(ctx, path.Join(manifestPath, fmt.Sprintf("%d%s", segmentNum, indexUpdatesFilenameSuffix))); err != nil {
return errors.Wrap(err, "failed to upload index updates")
}
// Delete the processed segment
if err := b.deleteStoreClient.DeleteObject(ctx, segmentPath); err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to delete processed segment",
@ -161,6 +176,16 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, j
return nil
}
// uploadIndexUpdateForCurrentSegment uploads the index updates for the currently processed segment to the object storage
func (b *JobBuilder) uploadIndexUpdateForCurrentSegment(ctx context.Context, path string) error {
indexUpdatesJSON, err := b.currSegmentIndexUpdates.encode()
if err != nil {
return err
}
return b.deleteStoreClient.PutObject(ctx, path, bytes.NewReader(indexUpdatesJSON))
}
func (b *JobBuilder) waitForSegmentCompletion(ctx context.Context) error {
for {
b.currentManifestMtx.RLock()
@ -171,6 +196,7 @@ func (b *JobBuilder) waitForSegmentCompletion(ctx context.Context) error {
b.currentManifestMtx.RUnlock()
select {
// ToDo(Sandeep): use timeout config(when introduced) to wait for segment to finish only upto the job timeout.
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
@ -266,22 +292,32 @@ func (b *JobBuilder) createJobsForChunksGroup(ctx context.Context, tableName, us
}
// OnJobResponse implements jobqueue.Builder interface
func (b *JobBuilder) OnJobResponse(response *grpc.JobResult) {
func (b *JobBuilder) OnJobResponse(response *grpc.JobResult) error {
b.currentManifestMtx.Lock()
defer b.currentManifestMtx.Unlock()
if _, ok := b.currentManifest.jobsInProgress[response.JobId]; !ok {
return
return nil
}
// Check for job failure
if response.Error != "" {
util_log.Logger.Log("msg", "job failed", "job_id", response.JobId, "error", response.Error)
b.currentManifest.cancel()
return
return nil
}
var jobResult JobResult
err := json.Unmarshal(response.Result, &jobResult)
if err != nil {
b.currentManifest.cancel()
return err
}
b.currSegmentIndexUpdates.addUpdates(jobResult)
delete(b.currentManifest.jobsInProgress, response.JobId)
return nil
}
func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segment, error) {
@ -298,3 +334,37 @@ func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segme
return &segment, nil
}
// indexUpdates collects updates to be made to the index for the segment in-process
type indexUpdates struct {
TableName string
mtx sync.Mutex
ChunksToDelete []string // List of chunks to be deleted from object storage and removed from the index of the current table
ChunksToDeIndex []string // List of chunks only to be removed from the index of the current table
ChunksToIndex []Chunk // List of chunks to be indexed in the current table
}
func (i *indexUpdates) reset(tableName string) {
i.TableName = tableName
i.ChunksToDelete = i.ChunksToDelete[:0]
i.ChunksToDeIndex = i.ChunksToDeIndex[:0]
i.ChunksToIndex = i.ChunksToIndex[:0]
}
func (i *indexUpdates) addUpdates(result JobResult) {
i.mtx.Lock()
defer i.mtx.Unlock()
i.ChunksToDelete = append(i.ChunksToDelete, result.ChunksToDelete...)
i.ChunksToDeIndex = append(i.ChunksToDeIndex, result.ChunksToDeIndex...)
i.ChunksToIndex = append(i.ChunksToIndex, result.ChunksToIndex...)
}
func (i *indexUpdates) encode() ([]byte, error) {
i.mtx.Lock()
defer i.mtx.Unlock()
return json.Marshal(i)
}

@ -4,6 +4,9 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"testing"
"github.com/prometheus/common/model"
@ -18,17 +21,20 @@ func TestJobBuilder_buildJobs(t *testing.T) {
now := model.Now()
for _, tc := range []struct {
name string
setupManifest func(client client.ObjectClient)
expectedJobs []grpc.Job
name string
setupManifest func(client client.ObjectClient) string
expectedJobs []grpc.Job
expectedIndexUpdates map[string][]byte
}{
{
name: "no manifests in storage",
setupManifest: func(_ client.ObjectClient) {},
name: "no manifests in storage",
setupManifest: func(_ client.ObjectClient) string {
return ""
},
},
{
name: "one manifest in storage with less than maxChunksPerJob",
setupManifest: func(client client.ObjectClient) {
setupManifest: func(client client.ObjectClient) string {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch.addDeleteRequest(&DeleteRequest{
UserID: user1,
@ -46,6 +52,8 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}))
require.NoError(t, manifestBuilder.Finish(context.Background()))
return manifestBuilder.path()
},
expectedJobs: []grpc.Job{
{
@ -66,10 +74,13 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}),
},
},
expectedIndexUpdates: map[string][]byte{
fmt.Sprintf("%d%s", 0, indexUpdatesFilenameSuffix): buildIndexUpdates(t, table1, 0, 1),
},
},
{
name: "one manifest in storage with more than maxChunksPerJob",
setupManifest: func(client client.ObjectClient) {
setupManifest: func(client client.ObjectClient) string {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch.addDeleteRequest(&DeleteRequest{
UserID: user1,
@ -87,6 +98,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}))
require.NoError(t, manifestBuilder.Finish(context.Background()))
return manifestBuilder.path()
},
expectedJobs: []grpc.Job{
{
@ -124,10 +136,13 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}),
},
},
expectedIndexUpdates: map[string][]byte{
fmt.Sprintf("%d%s", 0, indexUpdatesFilenameSuffix): buildIndexUpdates(t, table1, 0, 2),
},
},
{
name: "one manifest in storage with multiple groups",
setupManifest: func(client client.ObjectClient) {
setupManifest: func(client client.ObjectClient) string {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch.addDeleteRequest(&DeleteRequest{
UserID: user1,
@ -153,6 +168,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}))
require.NoError(t, manifestBuilder.Finish(context.Background()))
return manifestBuilder.path()
},
expectedJobs: []grpc.Job{
{
@ -199,10 +215,13 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}),
},
},
expectedIndexUpdates: map[string][]byte{
fmt.Sprintf("%d%s", 0, indexUpdatesFilenameSuffix): buildIndexUpdates(t, table1, 0, 2),
},
},
{
name: "one manifest in storage with multiple segments due to multiple tables",
setupManifest: func(client client.ObjectClient) {
setupManifest: func(client client.ObjectClient) string {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch.addDeleteRequest(&DeleteRequest{
UserID: user1,
@ -226,6 +245,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}))
require.NoError(t, manifestBuilder.Finish(context.Background()))
return manifestBuilder.path()
},
expectedJobs: []grpc.Job{
{
@ -263,6 +283,10 @@ func TestJobBuilder_buildJobs(t *testing.T) {
}),
},
},
expectedIndexUpdates: map[string][]byte{
fmt.Sprintf("%d%s", 0, indexUpdatesFilenameSuffix): buildIndexUpdates(t, table1, 0, 1),
fmt.Sprintf("%d%s", 1, indexUpdatesFilenameSuffix): buildIndexUpdates(t, table2, 1, 1),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
@ -270,19 +294,23 @@ func TestJobBuilder_buildJobs(t *testing.T) {
Directory: t.TempDir(),
})
require.NoError(t, err)
tc.setupManifest(objectClient)
manifestPath := tc.setupManifest(objectClient)
builder := NewJobBuilder(objectClient)
jobsChan := make(chan *grpc.Job)
var jobsBuilt []grpc.Job
go func() {
cnt := 0
for job := range jobsChan {
jobsBuilt = append(jobsBuilt, *job)
builder.OnJobResponse(&grpc.JobResult{
err := builder.OnJobResponse(&grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
Result: mustMarshal(t, buildDeletionJobResult(cnt)),
})
require.NoError(t, err)
cnt++
}
}()
@ -291,6 +319,17 @@ func TestJobBuilder_buildJobs(t *testing.T) {
require.Equal(t, len(tc.expectedJobs), len(jobsBuilt))
require.Equal(t, tc.expectedJobs, jobsBuilt)
for filename, expectedIndexUpdate := range tc.expectedIndexUpdates {
readCloser, _, err := objectClient.GetObject(context.Background(), filepath.Join(manifestPath, filename))
require.NoError(t, err)
indexUpdateJSON, err := io.ReadAll(readCloser)
require.NoError(t, err)
require.NoError(t, readCloser.Close())
require.Equal(t, expectedIndexUpdate, indexUpdateJSON)
}
})
}
}
@ -298,10 +337,12 @@ func TestJobBuilder_buildJobs(t *testing.T) {
func TestJobBuilder_ProcessManifest(t *testing.T) {
for _, tc := range []struct {
name string
jobResult []byte
jobProcessingError string
}{
{
name: "all jobs succeeded",
name: "all jobs succeeded",
jobResult: []byte(`{}`),
}, {
name: "job failure should fail the manifest processing",
jobProcessingError: "job processing failed",
@ -339,17 +380,19 @@ func TestJobBuilder_ProcessManifest(t *testing.T) {
}
segmentData, err := json.Marshal(segment)
require.NoError(t, err)
err = objectClient.PutObject(context.Background(), "test-manifest/1.json", bytes.NewReader(segmentData))
err = objectClient.PutObject(context.Background(), "test-manifest/0.json", bytes.NewReader(segmentData))
require.NoError(t, err)
jobsChan := make(chan *grpc.Job)
go func() {
for job := range jobsChan {
builder.OnJobResponse(&grpc.JobResult{
err := builder.OnJobResponse(&grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
Result: tc.jobResult,
Error: tc.jobProcessingError,
})
require.NoError(t, err)
}
}()
@ -371,3 +414,37 @@ func mustMarshalPayload(job *deletionJob) []byte {
return payload
}
func buildDeletionJobResult(jobCounter int) JobResult {
deletionJobResult := JobResult{
ChunksToDelete: []string{fmt.Sprintf("%d-d", jobCounter)},
ChunksToDeIndex: []string{fmt.Sprintf("%d-i", jobCounter)},
ChunksToIndex: []Chunk{
{
From: model.Time(jobCounter),
Through: model.Time(jobCounter),
Fingerprint: uint64(jobCounter),
Checksum: uint32(jobCounter),
KB: uint32(jobCounter),
Entries: uint32(jobCounter),
},
},
}
return deletionJobResult
}
func buildIndexUpdates(t *testing.T, tableName string, jobIndexStart, totalJobs int) []byte {
indexUpdates := indexUpdates{
TableName: tableName,
}
for i := 0; i < totalJobs; i++ {
indexUpdates.addUpdates(buildDeletionJobResult(jobIndexStart + i))
}
indexUpdatesJSON, err := indexUpdates.encode()
require.NoError(t, err)
return indexUpdatesJSON
}

@ -27,7 +27,7 @@ type Builder interface {
BuildJobs(ctx context.Context, jobsChan chan<- *grpc.Job)
// OnJobResponse reports back the response of the job execution.
OnJobResponse(response *grpc.JobResult)
OnJobResponse(response *grpc.JobResult) error
}
// Queue implements the job queue service
@ -259,7 +259,15 @@ func (q *Queue) reportJobResult(result *grpc.JobResult) error {
"job_type", result.JobType,
)
}
q.builders[result.JobType].OnJobResponse(result)
if err := q.builders[result.JobType].OnJobResponse(result); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to process job response",
"job_id", result.JobId,
"job_type", result.JobType,
"error", err,
)
return err
}
// Remove the job from processing jobs
delete(q.processingJobs, result.JobId)

@ -24,12 +24,14 @@ type mockBuilder struct {
jobsFailed atomic.Int32
}
func (m *mockBuilder) OnJobResponse(res *compactor_grpc.JobResult) {
func (m *mockBuilder) OnJobResponse(res *compactor_grpc.JobResult) error {
if res.Error != "" {
m.jobsFailed.Inc()
} else {
m.jobsSucceeded.Inc()
}
return nil
}
func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *compactor_grpc.Job) {

Loading…
Cancel
Save