From 267f4dc0ecc9f35b759846afc2bead79994e30f3 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 29 May 2025 17:38:52 +0530 Subject: [PATCH] feat(compactor HS): add job queue and deletion job builder (#17843) --- .../deletion/delete_request_batch.go | 2 +- .../deletion/deletion_manifest_builder.go | 17 +- .../deletion_manifest_builder_test.go | 53 +- pkg/compactor/deletion/job_builder.go | 306 ++++ pkg/compactor/deletion/job_builder_test.go | 253 +++ pkg/compactor/jobqueue/queue.go | 261 +++ pkg/compactor/jobqueue/queue.pb.go | 1610 +++++++++++++++++ pkg/compactor/jobqueue/queue.proto | 48 + pkg/compactor/jobqueue/queue_test.go | 238 +++ pkg/compactor/retention/retention.go | 9 +- pkg/compactor/retention/retention_test.go | 6 +- pkg/compactor/retention/util.go | 4 + pkg/compactor/retention/util_test.go | 6 +- pkg/compactor/testutil.go | 2 +- .../boltdb/compactor/compacted_index.go | 2 +- .../boltdb/compactor/compacted_index_test.go | 6 +- .../indexshipper/boltdb/compactor/iterator.go | 6 +- .../boltdb/compactor/iterator_test.go | 2 +- .../shipper/indexshipper/tsdb/compactor.go | 6 +- .../indexshipper/tsdb/compactor_test.go | 2 +- pkg/tool/audit/audit.go | 4 +- pkg/tool/audit/audit_test.go | 10 +- tools/tsdb/tsdb-map/main.go | 2 +- 23 files changed, 2797 insertions(+), 58 deletions(-) create mode 100644 pkg/compactor/deletion/job_builder.go create mode 100644 pkg/compactor/deletion/job_builder_test.go create mode 100644 pkg/compactor/jobqueue/queue.go create mode 100644 pkg/compactor/jobqueue/queue.pb.go create mode 100644 pkg/compactor/jobqueue/queue.proto create mode 100644 pkg/compactor/jobqueue/queue_test.go diff --git a/pkg/compactor/deletion/delete_request_batch.go b/pkg/compactor/deletion/delete_request_batch.go index b691dab0a3..6360f2193d 100644 --- a/pkg/compactor/deletion/delete_request_batch.go +++ b/pkg/compactor/deletion/delete_request_batch.go @@ -111,7 +111,7 @@ func (b *deleteRequestBatch) expired(userID []byte, chk retention.Chunk, lbls la "delete_request_id", deleteRequest.RequestID, "sequence_num", deleteRequest.SequenceNum, "user", deleteRequest.UserID, - "chunkID", string(chk.ChunkID), + "chunkID", chk.ChunkID, ) b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() return true, nil diff --git a/pkg/compactor/deletion/deletion_manifest_builder.go b/pkg/compactor/deletion/deletion_manifest_builder.go index 7e05242d58..6d75f195c8 100644 --- a/pkg/compactor/deletion/deletion_manifest_builder.go +++ b/pkg/compactor/deletion/deletion_manifest_builder.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "path" + "slices" "strings" "time" @@ -21,8 +22,8 @@ const ( // ChunksGroup holds a group of chunks selected by the same set of requests type ChunksGroup struct { - Requests []DeleteRequest `json:"requests"` - Chunks []retention.Chunk `json:"chunks"` + Requests []DeleteRequest `json:"requests"` + Chunks []string `json:"chunks"` } // segment holds limited chunks(upto maxChunksPerSegment) that needs to be processed. @@ -158,7 +159,7 @@ func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName strin } group := d.currentSegment[chunksGroupIdentifier] - group.Chunks = append(group.Chunks, chk) + group.Chunks = append(group.Chunks, chk.ChunkID) d.currentSegment[chunksGroupIdentifier] = group } @@ -210,6 +211,16 @@ func (d *deletionManifestBuilder) flushCurrentBatch(ctx context.Context) error { if len(b.ChunksGroups) == 0 { return nil } + + slices.SortFunc(b.ChunksGroups, func(a, b ChunksGroup) int { + if len(a.Requests) < len(b.Requests) { + return -1 + } else if len(a.Requests) > len(b.Requests) { + return 1 + } + + return 0 + }) batchJSON, err := json.Marshal(b) if err != nil { return err diff --git a/pkg/compactor/deletion/deletion_manifest_builder_test.go b/pkg/compactor/deletion/deletion_manifest_builder_test.go index afb6ed0819..2f26b372b1 100644 --- a/pkg/compactor/deletion/deletion_manifest_builder_test.go +++ b/pkg/compactor/deletion/deletion_manifest_builder_test.go @@ -26,10 +26,10 @@ const ( lblFooBarAndFizzBuzz = `{foo="bar", fizz="buzz"}` ) -func buildChunks(start model.Time, count int) []retention.Chunk { +func buildRetentionChunks(start model.Time, count int) []retention.Chunk { chunks := make([]retention.Chunk, 0, count) chunks = append(chunks, retention.Chunk{ - ChunkID: []byte(fmt.Sprintf("%d", start)), + ChunkID: fmt.Sprintf("%d", start), From: start, Through: start + 1, }) @@ -37,7 +37,7 @@ func buildChunks(start model.Time, count int) []retention.Chunk { for i := 1; i < count; i++ { from := chunks[i-1].From + 1 chunks = append(chunks, retention.Chunk{ - ChunkID: []byte(fmt.Sprintf("%d", from)), + ChunkID: fmt.Sprintf("%d", from), From: from, Through: from + 1, }) @@ -46,6 +46,15 @@ func buildChunks(start model.Time, count int) []retention.Chunk { return chunks } +func getChunkIDsFromRetentionChunks(chunks []retention.Chunk) []string { + chunkIDs := make([]string, 0, len(chunks)) + for _, chunk := range chunks { + chunkIDs = append(chunkIDs, chunk.ChunkID) + } + + return chunkIDs +} + type mockSeries struct { seriesID []byte userID string @@ -112,7 +121,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(10, 100), + chunks: buildRetentionChunks(10, 100), }, }, }, @@ -142,7 +151,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 100, }, }, - Chunks: buildChunks(10, 91), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(10, 91)), }, }, ChunksCount: 91, @@ -169,7 +178,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(0, maxChunksPerSegment+1), + chunks: buildRetentionChunks(0, maxChunksPerSegment+1), }, }, }, @@ -199,7 +208,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(0, maxChunksPerSegment), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerSegment)), }, }, ChunksCount: maxChunksPerSegment, @@ -217,7 +226,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(maxChunksPerSegment, 1), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(maxChunksPerSegment, 1)), }, }, ChunksCount: 1, @@ -244,7 +253,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(0, 50), + chunks: buildRetentionChunks(0, 50), }, }, { @@ -252,7 +261,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(50, 50), + chunks: buildRetentionChunks(50, 50), }, }, }, @@ -282,7 +291,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 100, }, }, - Chunks: buildChunks(0, 50), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, 50)), }, }, ChunksCount: 50, @@ -300,7 +309,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 100, }, }, - Chunks: buildChunks(50, 50), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(50, 50)), }, }, ChunksCount: 50, @@ -334,7 +343,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(0, maxChunksPerSegment+1), + chunks: buildRetentionChunks(0, maxChunksPerSegment+1), }, }, { @@ -342,7 +351,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user2, labels: mustParseLabel(lblFizzBuzz), - chunks: buildChunks(10, maxChunksPerSegment+1), + chunks: buildRetentionChunks(10, maxChunksPerSegment+1), }, }, }, @@ -378,7 +387,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(0, maxChunksPerSegment), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerSegment)), }, }, ChunksCount: maxChunksPerSegment, @@ -396,7 +405,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(maxChunksPerSegment, 1), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(maxChunksPerSegment, 1)), }, }, ChunksCount: 1, @@ -414,7 +423,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 10 + maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(10, maxChunksPerSegment), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(10, maxChunksPerSegment)), }, }, ChunksCount: maxChunksPerSegment, @@ -432,7 +441,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 10 + maxChunksPerSegment + 1, }, }, - Chunks: buildChunks(10+maxChunksPerSegment, 1), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(10+maxChunksPerSegment, 1)), }, }, ChunksCount: 1, @@ -466,7 +475,7 @@ func TestDeletionManifestBuilder(t *testing.T) { series: &mockSeries{ userID: user1, labels: mustParseLabel(lblFooBarAndFizzBuzz), - chunks: buildChunks(25, 50), + chunks: buildRetentionChunks(25, 50), }, }, }, @@ -502,7 +511,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 100, }, }, - Chunks: buildChunks(25, 25), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(25, 25)), }, { Requests: []DeleteRequest{ @@ -519,7 +528,7 @@ func TestDeletionManifestBuilder(t *testing.T) { EndTime: 100, }, }, - Chunks: buildChunks(50, 25), + Chunks: getChunkIDsFromRetentionChunks(buildRetentionChunks(50, 25)), }, }, @@ -628,7 +637,7 @@ func TestDeletionManifestBuilder_Errors(t *testing.T) { err = builder.AddSeries(ctx, table1, &mockSeries{ userID: user2, labels: mustParseLabel(lblFooBar), - chunks: buildChunks(0, 25), + chunks: buildRetentionChunks(0, 25), }) require.EqualError(t, err, fmt.Sprintf("no requests loaded for user: %s", user2)) diff --git a/pkg/compactor/deletion/job_builder.go b/pkg/compactor/deletion/job_builder.go new file mode 100644 index 0000000000..8b45779166 --- /dev/null +++ b/pkg/compactor/deletion/job_builder.go @@ -0,0 +1,306 @@ +package deletion + +import ( + "context" + "encoding/json" + "fmt" + "path" + "strconv" + "sync" + "time" + + "github.com/go-kit/log/level" + + "github.com/grafana/loki/v3/pkg/compactor/jobqueue" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + util_log "github.com/grafana/loki/v3/pkg/util/log" +) + +const maxChunksPerJob = 1000 + +type deletionJob struct { + ChunkIDs []string `json:"chunk_ids"` + DeletionQueries []string `json:"deletion_queries"` +} + +type manifestJobs struct { + jobsInProgress map[string]struct{} + cancel context.CancelFunc + manifestPath string +} + +type manifestError struct { + JobID string `json:"job_id"` + Error string `json:"error"` +} + +type JobBuilder struct { + deleteStoreClient client.ObjectClient + + // Current manifest being processed + currentManifest manifestJobs + currentManifestMtx sync.RWMutex +} + +func NewJobBuilder(deleteStoreClient client.ObjectClient) *JobBuilder { + return &JobBuilder{ + deleteStoreClient: deleteStoreClient, + } +} + +// BuildJobs implements jobqueue.Builder interface +func (b *JobBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *jobqueue.Job) error { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + if err := b.buildJobs(ctx, jobsChan); err != nil { + return err + } + + // Wait for next tick or context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + // Continue to next iteration + } + } +} + +func (b *JobBuilder) buildJobs(ctx context.Context, jobsChan chan<- *jobqueue.Job) error { + // List all manifest directories + manifests, err := b.listManifests(ctx) + if err != nil { + return err + } + + // Process each manifest + for _, manifestPath := range manifests { + if err := b.processManifest(ctx, manifestPath, jobsChan); err != nil { + return err + } + } + + return nil +} + +func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, jobsChan chan<- *jobqueue.Job) error { + level.Info(util_log.Logger).Log("msg", "starting manifest processing", "manifest", manifestPath) + + // Read manifest + manifest, err := b.readManifest(ctx, manifestPath) + if err != nil { + return err + } + + // Initialize tracking for this manifest + ctx, cancel := context.WithCancel(ctx) + b.currentManifestMtx.Lock() + b.currentManifest = manifestJobs{ + jobsInProgress: make(map[string]struct{}), + manifestPath: manifestPath, + cancel: cancel, + } + b.currentManifestMtx.Unlock() + + // Process segments sequentially + for segmentNum := 1; ctx.Err() == nil && segmentNum <= manifest.SegmentsCount; segmentNum++ { + level.Info(util_log.Logger).Log("msg", "starting segment processing", + "manifest", manifestPath, + "segment", segmentNum) + + segmentPath := path.Join(manifestPath, fmt.Sprintf("%d.json", segmentNum)) + + manifestExists, err := b.deleteStoreClient.ObjectExists(ctx, segmentPath) + if err != nil { + return err + } + if !manifestExists { + level.Info(util_log.Logger).Log("msg", "manifest does not exist(likely processed already), skipping", "manifest", manifestPath) + continue + } + + segment, err := b.getSegment(ctx, segmentPath) + if err != nil { + return err + } + + // Reset job counters for this segment + b.currentManifestMtx.Lock() + b.currentManifest.jobsInProgress = make(map[string]struct{}) + b.currentManifestMtx.Unlock() + + // Process each chunks group (same deletion query) + for i, group := range segment.ChunksGroups { + // Check if we should stop processing this manifest + if ctx.Err() != nil { + return ctx.Err() + } + + if err := b.createJobsForChunksGroup(ctx, fmt.Sprintf("%d", i), group, jobsChan); err != nil { + return err + } + } + + // Wait for all jobs in this segment to complete + if err := b.waitForSegmentCompletion(ctx); err != nil { + return err + } + + // Delete the processed segment + if err := b.deleteStoreClient.DeleteObject(ctx, segmentPath); err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to delete processed segment", + "segment", segmentPath, + "error", err) + } + + level.Info(util_log.Logger).Log("msg", "finished segment processing", + "manifest", manifestPath, + "segment", segmentNum) + } + + level.Info(util_log.Logger).Log("msg", "finished manifest processing", "manifest", manifestPath) + return nil +} + +func (b *JobBuilder) waitForSegmentCompletion(ctx context.Context) error { + for { + b.currentManifestMtx.RLock() + if len(b.currentManifest.jobsInProgress) == 0 { + b.currentManifestMtx.RUnlock() + return nil + } + b.currentManifestMtx.RUnlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + // Check again + } + } +} + +func (b *JobBuilder) listManifests(ctx context.Context) ([]string, error) { + // List all directories in the deletion store + _, commonPrefixes, err := b.deleteStoreClient.List(ctx, "", "/") + if err != nil { + return nil, err + } + + // Filter for manifest directories (they are named with Unix timestamps) + var manifests []string + for _, commonPrefix := range commonPrefixes { + // Check if directory name is a valid timestamp + if _, err := strconv.ParseInt(path.Base(string(commonPrefix)), 10, 64); err != nil { + continue + } + + // Check if manifest.json exists in this directory + manifestPath := path.Join(string(commonPrefix), manifestFileName) + exists, err := b.deleteStoreClient.ObjectExists(ctx, manifestPath) + if err != nil { + return nil, err + } + if !exists { + // Skip directories without manifest.json + continue + } + + manifests = append(manifests, string(commonPrefix)) + } + + return manifests, nil +} + +func (b *JobBuilder) readManifest(ctx context.Context, manifestPath string) (*manifest, error) { + // Read manifest file + reader, _, err := b.deleteStoreClient.GetObject(ctx, path.Join(manifestPath, manifestFileName)) + if err != nil { + return nil, err + } + defer reader.Close() + + var m manifest + if err := json.NewDecoder(reader).Decode(&m); err != nil { + return nil, err + } + + return &m, nil +} + +func (b *JobBuilder) createJobsForChunksGroup(ctx context.Context, groupID string, group ChunksGroup, jobsChan chan<- *jobqueue.Job) error { + deletionQueries := make([]string, 0, len(group.Requests)) + for _, req := range group.Requests { + deletionQueries = append(deletionQueries, req.Query) + } + + // Split chunks into groups of maxChunksPerJob + for i := 0; i < len(group.Chunks); i += maxChunksPerJob { + end := i + maxChunksPerJob + if end > len(group.Chunks) { + end = len(group.Chunks) + } + + payload, err := json.Marshal(&deletionJob{ + ChunkIDs: group.Chunks[i:end], + DeletionQueries: deletionQueries, + }) + if err != nil { + return err + } + + job := &jobqueue.Job{ + Id: fmt.Sprintf("%s_%d", groupID, i/maxChunksPerJob), + Type: jobqueue.JOB_TYPE_DELETION, + Payload: payload, + } + + b.currentManifestMtx.Lock() + b.currentManifest.jobsInProgress[job.Id] = struct{}{} + b.currentManifestMtx.Unlock() + + select { + case <-ctx.Done(): + return ctx.Err() + case jobsChan <- job: + } + } + + return nil +} + +// OnJobResponse implements jobqueue.Builder interface +func (b *JobBuilder) OnJobResponse(response *jobqueue.ReportJobResultRequest) { + b.currentManifestMtx.Lock() + defer b.currentManifestMtx.Unlock() + + if _, ok := b.currentManifest.jobsInProgress[response.JobId]; !ok { + return + } + + // Check for job failure + if response.Error != "" { + util_log.Logger.Log("msg", "job failed", "job_id", response.JobId, "error", response.Error) + b.currentManifest.cancel() + return + } + + delete(b.currentManifest.jobsInProgress, response.JobId) +} + +func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segment, error) { + reader, _, err := b.deleteStoreClient.GetObject(ctx, segmentPath) + if err != nil { + return nil, err + } + defer reader.Close() + + var segment segment + if err := json.NewDecoder(reader).Decode(&segment); err != nil { + return nil, err + } + + return &segment, nil +} diff --git a/pkg/compactor/deletion/job_builder_test.go b/pkg/compactor/deletion/job_builder_test.go new file mode 100644 index 0000000000..e668704cc6 --- /dev/null +++ b/pkg/compactor/deletion/job_builder_test.go @@ -0,0 +1,253 @@ +package deletion + +import ( + "bytes" + "context" + "encoding/json" + "math" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/compactor/jobqueue" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" +) + +func TestJobBuilder_buildJobs(t *testing.T) { + for _, tc := range []struct { + name string + setupManifest func(client client.ObjectClient) + expectedJobs []jobqueue.Job + }{ + { + name: "no manifests in storage", + setupManifest: func(_ client.ObjectClient) {}, + }, + { + name: "one manifest in storage with less than maxChunksPerJob", + setupManifest: func(client client.ObjectClient) { + deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch.addDeleteRequest(&DeleteRequest{ + UserID: user1, + Query: lblFooBar, + StartTime: 0, + EndTime: math.MaxInt64, + }) + manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) + require.NoError(t, err) + + require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ + userID: user1, + labels: mustParseLabel(lblFooBar), + chunks: buildRetentionChunks(0, maxChunksPerJob-1), + })) + + require.NoError(t, manifestBuilder.Finish(context.Background())) + }, + expectedJobs: []jobqueue.Job{ + { + Id: "0_0", + Type: jobqueue.JOB_TYPE_DELETION, + Payload: mustMarshalPayload(&deletionJob{ + ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerJob-1)), + DeletionQueries: []string{lblFooBar}, + }), + }, + }, + }, + { + name: "one manifest in storage with more than maxChunksPerJob", + setupManifest: func(client client.ObjectClient) { + deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch.addDeleteRequest(&DeleteRequest{ + UserID: user1, + Query: lblFooBar, + StartTime: 0, + EndTime: math.MaxInt64, + }) + manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) + require.NoError(t, err) + + require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ + userID: user1, + labels: mustParseLabel(lblFooBar), + chunks: buildRetentionChunks(0, maxChunksPerJob+1), + })) + + require.NoError(t, manifestBuilder.Finish(context.Background())) + }, + expectedJobs: []jobqueue.Job{ + { + Id: "0_0", + Type: jobqueue.JOB_TYPE_DELETION, + Payload: mustMarshalPayload(&deletionJob{ + ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerJob)), + DeletionQueries: []string{lblFooBar}, + }), + }, + { + Id: "0_1", + Type: jobqueue.JOB_TYPE_DELETION, + Payload: mustMarshalPayload(&deletionJob{ + ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(maxChunksPerJob, 1)), + DeletionQueries: []string{lblFooBar}, + }), + }, + }, + }, + { + name: "one manifest in storage with multiple groups", + setupManifest: func(client client.ObjectClient) { + deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch.addDeleteRequest(&DeleteRequest{ + UserID: user1, + RequestID: req1, + Query: lblFooBar, + StartTime: 0, + EndTime: 100, + }) + deleteRequestBatch.addDeleteRequest(&DeleteRequest{ + UserID: user1, + RequestID: req2, + Query: lblFizzBuzz, + StartTime: 51, + EndTime: 100, + }) + manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) + require.NoError(t, err) + + require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ + userID: user1, + labels: mustParseLabel(lblFooBarAndFizzBuzz), + chunks: buildRetentionChunks(25, 50), + })) + + require.NoError(t, manifestBuilder.Finish(context.Background())) + }, + expectedJobs: []jobqueue.Job{ + { + Id: "0_0", + Type: jobqueue.JOB_TYPE_DELETION, + Payload: mustMarshalPayload(&deletionJob{ + ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(25, 25)), + DeletionQueries: []string{lblFooBar}, + }), + }, + { + Id: "1_0", + Type: jobqueue.JOB_TYPE_DELETION, + Payload: mustMarshalPayload(&deletionJob{ + ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(50, 25)), + DeletionQueries: []string{lblFooBar, lblFizzBuzz}, + }), + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) + require.NoError(t, err) + tc.setupManifest(objectClient) + + builder := NewJobBuilder(objectClient) + jobsChan := make(chan *jobqueue.Job) + + var jobsBuilt []jobqueue.Job + go func() { + for job := range jobsChan { + jobsBuilt = append(jobsBuilt, *job) + builder.OnJobResponse(&jobqueue.ReportJobResultRequest{ + JobId: job.Id, + JobType: job.Type, + }) + } + }() + + err = builder.buildJobs(context.Background(), jobsChan) + require.NoError(t, err) + + require.Equal(t, len(tc.expectedJobs), len(jobsBuilt)) + require.Equal(t, tc.expectedJobs, jobsBuilt) + }) + } +} + +func TestJobBuilder_ProcessManifest(t *testing.T) { + for _, tc := range []struct { + name string + jobProcessingError string + }{ + { + name: "all jobs succeeded", + }, { + name: "job failure should fail the manifest processing", + jobProcessingError: "job processing failed", + }, + } { + t.Run(tc.name, func(t *testing.T) { + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) + require.NoError(t, err) + + builder := NewJobBuilder(objectClient) + + // Create a test manifest + manifest := &manifest{ + SegmentsCount: 1, + } + manifestData, err := json.Marshal(manifest) + require.NoError(t, err) + err = objectClient.PutObject(context.Background(), "test-manifest/manifest.json", bytes.NewReader(manifestData)) + require.NoError(t, err) + + // Create a test segment + segment := &segment{ + UserID: "user1", + TableName: "table1", + ChunksGroups: []ChunksGroup{ + { + Chunks: []string{"chunk1", "chunk2"}, + Requests: []DeleteRequest{ + {Query: "{job=\"test\"}"}, + }, + }, + }, + } + segmentData, err := json.Marshal(segment) + require.NoError(t, err) + err = objectClient.PutObject(context.Background(), "test-manifest/1.json", bytes.NewReader(segmentData)) + require.NoError(t, err) + + jobsChan := make(chan *jobqueue.Job) + go func() { + for job := range jobsChan { + builder.OnJobResponse(&jobqueue.ReportJobResultRequest{ + JobId: job.Id, + JobType: job.Type, + Error: tc.jobProcessingError, + }) + } + }() + + err = builder.processManifest(context.Background(), "test-manifest", jobsChan) + if tc.jobProcessingError != "" { + require.ErrorIs(t, err, context.Canceled) + } else { + require.NoError(t, err) + } + }) + } +} + +func mustMarshalPayload(job *deletionJob) []byte { + payload, err := json.Marshal(job) + if err != nil { + panic(err) + } + + return payload +} diff --git a/pkg/compactor/jobqueue/queue.go b/pkg/compactor/jobqueue/queue.go new file mode 100644 index 0000000000..c07e2904bf --- /dev/null +++ b/pkg/compactor/jobqueue/queue.go @@ -0,0 +1,261 @@ +package jobqueue + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/go-kit/log/level" + "go.uber.org/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + util_log "github.com/grafana/loki/v3/pkg/util/log" +) + +var ( + // ErrBuilderAlreadyRegistered is returned when trying to register a builder for a job type that already has one + ErrBuilderAlreadyRegistered = errors.New("builder already registered for this job type") +) + +// Builder defines the interface for building jobs that will be added to the queue +type Builder interface { + // BuildJobs builds new jobs and sends them to the provided channel + // It should be a blocking call and returns when ctx is cancelled. + BuildJobs(ctx context.Context, jobsChan chan<- *Job) error + + // OnJobResponse reports back the response of the job execution. + OnJobResponse(response *ReportJobResultRequest) +} + +// Queue implements the job queue service +type Queue struct { + queue chan *Job + closed atomic.Bool + builders map[JobType]Builder + wg sync.WaitGroup + stop chan struct{} + checkTimedOutJobsInterval time.Duration + + // Track jobs that are being processed + processingJobs map[string]*processingJob + processingJobsMtx sync.RWMutex + jobTimeout time.Duration + maxRetries int +} + +type processingJob struct { + job *Job + dequeued time.Time + retryCount int +} + +// New creates a new job queue +func New() *Queue { + return newQueue(time.Minute) +} + +// newQueue creates a new job queue with a configurable timed out jobs check ticker interval (for testing) +func newQueue(checkTimedOutJobsInterval time.Duration) *Queue { + q := &Queue{ + queue: make(chan *Job), + builders: make(map[JobType]Builder), + stop: make(chan struct{}), + checkTimedOutJobsInterval: checkTimedOutJobsInterval, + processingJobs: make(map[string]*processingJob), + // ToDo(Sandeep): make jobTimeout and maxRetries configurable(possibly job specific) + jobTimeout: 15 * time.Minute, + maxRetries: 3, + } + + // Start the job timeout checker + q.wg.Add(1) + go q.checkJobTimeouts() + + return q +} + +// RegisterBuilder registers a builder for a specific job type +func (q *Queue) RegisterBuilder(jobType JobType, builder Builder) error { + if _, exists := q.builders[jobType]; exists { + return ErrBuilderAlreadyRegistered + } + + q.builders[jobType] = builder + return nil +} + +// Start starts all registered builders +func (q *Queue) Start(ctx context.Context) error { + for jobType, builder := range q.builders { + q.wg.Add(1) + go q.startBuilder(ctx, jobType, builder) + } + return nil +} + +// Stop stops all builders +func (q *Queue) Stop() error { + close(q.stop) + q.wg.Wait() + return nil +} + +func (q *Queue) startBuilder(ctx context.Context, jobType JobType, builder Builder) { + defer q.wg.Done() + + // Start the builder in a separate goroutine + builderErrChan := make(chan error, 1) + go func() { + builderErrChan <- builder.BuildJobs(ctx, q.queue) + }() + + for { + select { + case <-ctx.Done(): + return + case <-q.stop: + return + case err := <-builderErrChan: + if err != nil && !errors.Is(err, context.Canceled) { + level.Error(util_log.Logger).Log("msg", "builder error", "job_type", jobType, "error", err) + } + return + } + } +} + +func (q *Queue) checkJobTimeouts() { + defer q.wg.Done() + + ticker := time.NewTicker(q.checkTimedOutJobsInterval) + defer ticker.Stop() + + for { + select { + case <-q.stop: + return + case <-ticker.C: + q.processingJobsMtx.Lock() + now := time.Now() + for jobID, pj := range q.processingJobs { + if now.Sub(pj.dequeued) > q.jobTimeout { + // Requeue the job + select { + case <-q.stop: + return + case q.queue <- pj.job: + level.Warn(util_log.Logger).Log( + "msg", "job timed out, requeuing", + "job_id", jobID, + "job_type", pj.job.Type, + "timeout", q.jobTimeout, + ) + } + delete(q.processingJobs, jobID) + } + } + q.processingJobsMtx.Unlock() + } + } +} + +// Dequeue implements the gRPC Dequeue method +func (q *Queue) Dequeue(ctx context.Context, _ *DequeueRequest) (*DequeueResponse, error) { + if q.closed.Load() { + return &DequeueResponse{}, nil + } + + select { + case <-ctx.Done(): + return nil, status.Error(codes.Canceled, ctx.Err().Error()) + case job, ok := <-q.queue: + if !ok { + return &DequeueResponse{}, nil + } + + // Track the job as being processed + q.processingJobsMtx.Lock() + defer q.processingJobsMtx.Unlock() + q.processingJobs[job.Id] = &processingJob{ + job: job, + dequeued: time.Now(), + retryCount: 0, + } + + return &DequeueResponse{ + Job: job, + }, nil + } +} + +// ReportJobResult implements the gRPC ReportJobResult method +func (q *Queue) ReportJobResult(ctx context.Context, req *ReportJobResultRequest) (*ReportJobResultResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + q.processingJobsMtx.Lock() + defer q.processingJobsMtx.Unlock() + pj, exists := q.processingJobs[req.JobId] + if !exists { + return nil, status.Error(codes.NotFound, "job not found") + } + + if req.Error != "" { + level.Error(util_log.Logger).Log( + "msg", "job execution failed", + "job_id", req.JobId, + "job_type", req.JobType, + "error", req.Error, + "retry_count", pj.retryCount, + ) + + // Check if we should retry the job + if pj.retryCount < q.maxRetries { + pj.retryCount++ + level.Info(util_log.Logger).Log( + "msg", "retrying failed job", + "job_id", req.JobId, + "job_type", req.JobType, + "retry_count", pj.retryCount, + "max_retries", q.maxRetries, + ) + + // Requeue the job + select { + case <-ctx.Done(): + case q.queue <- pj.job: + return &ReportJobResultResponse{}, nil + } + } else { + level.Error(util_log.Logger).Log( + "msg", "job failed after max retries", + "job_id", req.JobId, + "job_type", req.JobType, + "max_retries", q.maxRetries, + ) + } + } else { + level.Debug(util_log.Logger).Log( + "msg", "job execution succeeded", + "job_id", req.JobId, + "job_type", req.JobType, + ) + } + q.builders[req.JobType].OnJobResponse(req) + + // Remove the job from processing jobs + delete(q.processingJobs, req.JobId) + + return &ReportJobResultResponse{}, nil +} + +// Close closes the queue and releases all resources +func (q *Queue) Close() { + if !q.closed.Load() { + close(q.queue) + q.closed.Store(true) + } +} diff --git a/pkg/compactor/jobqueue/queue.pb.go b/pkg/compactor/jobqueue/queue.pb.go new file mode 100644 index 0000000000..dab3e3a957 --- /dev/null +++ b/pkg/compactor/jobqueue/queue.pb.go @@ -0,0 +1,1610 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/compactor/jobqueue/queue.proto + +package jobqueue + +import ( + bytes "bytes" + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + types "github.com/gogo/protobuf/types" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strconv "strconv" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// JobType represents the type of job +type JobType int32 + +const ( + JOB_TYPE_DELETION JobType = 0 +) + +var JobType_name = map[int32]string{ + 0: "JOB_TYPE_DELETION", +} + +var JobType_value = map[string]int32{ + "JOB_TYPE_DELETION": 0, +} + +func (JobType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{0} +} + +// Job represents a single job in the queue +type Job struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Type JobType `protobuf:"varint,2,opt,name=type,proto3,enum=jobqueue.JobType" json:"type,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *Job) Reset() { *m = Job{} } +func (*Job) ProtoMessage() {} +func (*Job) Descriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{0} +} +func (m *Job) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Job) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Job.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Job) XXX_Merge(src proto.Message) { + xxx_messageInfo_Job.Merge(m, src) +} +func (m *Job) XXX_Size() int { + return m.Size() +} +func (m *Job) XXX_DiscardUnknown() { + xxx_messageInfo_Job.DiscardUnknown(m) +} + +var xxx_messageInfo_Job proto.InternalMessageInfo + +func (m *Job) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *Job) GetType() JobType { + if m != nil { + return m.Type + } + return JOB_TYPE_DELETION +} + +func (m *Job) GetPayload() []byte { + if m != nil { + return m.Payload + } + return nil +} + +// DequeueRequest is used to request a job from the queue +type DequeueRequest struct { +} + +func (m *DequeueRequest) Reset() { *m = DequeueRequest{} } +func (*DequeueRequest) ProtoMessage() {} +func (*DequeueRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{1} +} +func (m *DequeueRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DequeueRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DequeueRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DequeueRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DequeueRequest.Merge(m, src) +} +func (m *DequeueRequest) XXX_Size() int { + return m.Size() +} +func (m *DequeueRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DequeueRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DequeueRequest proto.InternalMessageInfo + +// DequeueResponse contains the dequeued job +type DequeueResponse struct { + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` +} + +func (m *DequeueResponse) Reset() { *m = DequeueResponse{} } +func (*DequeueResponse) ProtoMessage() {} +func (*DequeueResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{2} +} +func (m *DequeueResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DequeueResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DequeueResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DequeueResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DequeueResponse.Merge(m, src) +} +func (m *DequeueResponse) XXX_Size() int { + return m.Size() +} +func (m *DequeueResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DequeueResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DequeueResponse proto.InternalMessageInfo + +func (m *DequeueResponse) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +// ReportJobResultRequest is used to report the result of executing a job +type ReportJobResultRequest struct { + JobId string `protobuf:"bytes,1,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"` + JobType JobType `protobuf:"varint,2,opt,name=job_type,json=jobType,proto3,enum=jobqueue.JobType" json:"job_type,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` + Result *types.Any `protobuf:"bytes,4,opt,name=result,proto3" json:"result,omitempty"` +} + +func (m *ReportJobResultRequest) Reset() { *m = ReportJobResultRequest{} } +func (*ReportJobResultRequest) ProtoMessage() {} +func (*ReportJobResultRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{3} +} +func (m *ReportJobResultRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReportJobResultRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReportJobResultRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReportJobResultRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReportJobResultRequest.Merge(m, src) +} +func (m *ReportJobResultRequest) XXX_Size() int { + return m.Size() +} +func (m *ReportJobResultRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReportJobResultRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReportJobResultRequest proto.InternalMessageInfo + +func (m *ReportJobResultRequest) GetJobId() string { + if m != nil { + return m.JobId + } + return "" +} + +func (m *ReportJobResultRequest) GetJobType() JobType { + if m != nil { + return m.JobType + } + return JOB_TYPE_DELETION +} + +func (m *ReportJobResultRequest) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *ReportJobResultRequest) GetResult() *types.Any { + if m != nil { + return m.Result + } + return nil +} + +// ReportJobResultResponse is the response to reporting a job result +type ReportJobResultResponse struct { +} + +func (m *ReportJobResultResponse) Reset() { *m = ReportJobResultResponse{} } +func (*ReportJobResultResponse) ProtoMessage() {} +func (*ReportJobResultResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_220f68f506b923fa, []int{4} +} +func (m *ReportJobResultResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReportJobResultResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReportJobResultResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReportJobResultResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReportJobResultResponse.Merge(m, src) +} +func (m *ReportJobResultResponse) XXX_Size() int { + return m.Size() +} +func (m *ReportJobResultResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ReportJobResultResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ReportJobResultResponse proto.InternalMessageInfo + +func init() { + proto.RegisterEnum("jobqueue.JobType", JobType_name, JobType_value) + proto.RegisterType((*Job)(nil), "jobqueue.Job") + proto.RegisterType((*DequeueRequest)(nil), "jobqueue.DequeueRequest") + proto.RegisterType((*DequeueResponse)(nil), "jobqueue.DequeueResponse") + proto.RegisterType((*ReportJobResultRequest)(nil), "jobqueue.ReportJobResultRequest") + proto.RegisterType((*ReportJobResultResponse)(nil), "jobqueue.ReportJobResultResponse") +} + +func init() { + proto.RegisterFile("pkg/compactor/jobqueue/queue.proto", fileDescriptor_220f68f506b923fa) +} + +var fileDescriptor_220f68f506b923fa = []byte{ + // 466 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0x41, 0x6e, 0xd3, 0x40, + 0x14, 0x86, 0x3d, 0x49, 0x9b, 0xb4, 0x0f, 0x48, 0xdb, 0x51, 0x0b, 0x4e, 0x16, 0x53, 0x63, 0x09, + 0x29, 0x42, 0x95, 0x2d, 0x85, 0x0b, 0x40, 0xd5, 0x2c, 0x1a, 0x21, 0x0a, 0x56, 0x84, 0x80, 0x4d, + 0xe4, 0x89, 0xa7, 0xc6, 0x6e, 0x92, 0x67, 0xec, 0xf1, 0xc2, 0x3b, 0x8e, 0xc0, 0x15, 0xd8, 0x20, + 0x8e, 0xc2, 0x32, 0xcb, 0x2e, 0x89, 0xb3, 0x61, 0xd9, 0x23, 0x20, 0x8f, 0xe3, 0x56, 0x6d, 0xa3, + 0x6e, 0x6c, 0xcf, 0xfb, 0x7f, 0xbf, 0xff, 0xf3, 0x2f, 0x83, 0x19, 0x5d, 0xf8, 0xf6, 0x18, 0xa7, + 0x91, 0x3b, 0x96, 0x18, 0xdb, 0x21, 0xf2, 0x6f, 0xa9, 0x48, 0x85, 0xad, 0xae, 0x56, 0x14, 0xa3, + 0x44, 0xba, 0x55, 0x4d, 0x3b, 0x6d, 0x1f, 0xd1, 0x9f, 0x08, 0x5b, 0xcd, 0x79, 0x7a, 0x6e, 0xbb, + 0xb3, 0xac, 0x34, 0x75, 0x0e, 0xef, 0x4a, 0x32, 0x98, 0x8a, 0x44, 0xba, 0xd3, 0xa8, 0x34, 0x98, + 0x1f, 0xa1, 0x3e, 0x40, 0x4e, 0x5b, 0x50, 0x0b, 0x3c, 0x9d, 0x18, 0xa4, 0xbb, 0xed, 0xd4, 0x02, + 0x8f, 0xbe, 0x80, 0x0d, 0x99, 0x45, 0x42, 0xaf, 0x19, 0xa4, 0xdb, 0xea, 0xed, 0x59, 0x55, 0x96, + 0x35, 0x40, 0x3e, 0xcc, 0x22, 0xe1, 0x28, 0x99, 0xea, 0xd0, 0x8c, 0xdc, 0x6c, 0x82, 0xae, 0xa7, + 0xd7, 0x0d, 0xd2, 0x7d, 0xec, 0x54, 0x47, 0x73, 0x17, 0x5a, 0x27, 0x42, 0xbd, 0xe2, 0x14, 0xb7, + 0x44, 0x9a, 0x3d, 0xd8, 0xb9, 0x9e, 0x24, 0x11, 0xce, 0x12, 0x41, 0x0f, 0xa1, 0x1e, 0x22, 0x57, + 0xb1, 0x8f, 0x7a, 0x4f, 0x6e, 0x85, 0x38, 0x85, 0x62, 0xfe, 0x24, 0xf0, 0xd4, 0x11, 0x11, 0xc6, + 0xb2, 0x18, 0x89, 0x24, 0x9d, 0xc8, 0xd5, 0x3a, 0x7a, 0x00, 0x8d, 0x10, 0xf9, 0xe8, 0x9a, 0x7a, + 0x33, 0x44, 0x7e, 0xea, 0xd1, 0x23, 0x28, 0x7a, 0x19, 0x3d, 0x0c, 0xdf, 0x0c, 0xcb, 0x07, 0xba, + 0x0f, 0x9b, 0x22, 0x8e, 0x31, 0x56, 0xf4, 0xdb, 0x4e, 0x79, 0xa0, 0x47, 0xd0, 0x88, 0x55, 0x96, + 0xbe, 0xa1, 0xc8, 0xf6, 0xad, 0xb2, 0x45, 0xab, 0x6a, 0xd1, 0x7a, 0x33, 0xcb, 0x9c, 0x95, 0xc7, + 0x6c, 0xc3, 0xb3, 0x7b, 0x88, 0xe5, 0xf7, 0xbd, 0x34, 0xa0, 0xb9, 0x8a, 0xa4, 0x07, 0xb0, 0x37, + 0x38, 0x3b, 0x1e, 0x0d, 0x3f, 0xbf, 0xef, 0x8f, 0x4e, 0xfa, 0x6f, 0xfb, 0xc3, 0xd3, 0xb3, 0x77, + 0xbb, 0x5a, 0xef, 0x17, 0x81, 0xad, 0x01, 0xf2, 0x0f, 0x05, 0x1e, 0x7d, 0x0d, 0xcd, 0x55, 0x43, + 0x54, 0xbf, 0x81, 0xbe, 0x5d, 0x63, 0xa7, 0xbd, 0x46, 0x29, 0xe3, 0x4c, 0x8d, 0x7e, 0x82, 0x9d, + 0x3b, 0x2c, 0xd4, 0xb8, 0xf1, 0xaf, 0x6f, 0xb2, 0xf3, 0xfc, 0x01, 0x47, 0xb5, 0xf9, 0xd8, 0x9b, + 0x2f, 0x98, 0x76, 0xb9, 0x60, 0xda, 0xd5, 0x82, 0x91, 0xef, 0x39, 0x23, 0xbf, 0x73, 0x46, 0xfe, + 0xe4, 0x8c, 0xcc, 0x73, 0x46, 0xfe, 0xe6, 0x8c, 0xfc, 0xcb, 0x99, 0x76, 0x95, 0x33, 0xf2, 0x63, + 0xc9, 0xb4, 0xf9, 0x92, 0x69, 0x97, 0x4b, 0xa6, 0x7d, 0xb1, 0xfc, 0x40, 0x7e, 0x4d, 0xb9, 0x35, + 0xc6, 0xa9, 0xed, 0xc7, 0xee, 0xb9, 0x3b, 0x73, 0xed, 0x09, 0x5e, 0x04, 0xf6, 0xfa, 0xff, 0x9b, + 0x37, 0x54, 0xc3, 0xaf, 0xfe, 0x07, 0x00, 0x00, 0xff, 0xff, 0xed, 0x1e, 0x45, 0xb4, 0x00, 0x03, + 0x00, 0x00, +} + +func (x JobType) String() string { + s, ok := JobType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (this *Job) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Job) + if !ok { + that2, ok := that.(Job) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Id != that1.Id { + return false + } + if this.Type != that1.Type { + return false + } + if !bytes.Equal(this.Payload, that1.Payload) { + return false + } + return true +} +func (this *DequeueRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DequeueRequest) + if !ok { + that2, ok := that.(DequeueRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *DequeueResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DequeueResponse) + if !ok { + that2, ok := that.(DequeueResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Job.Equal(that1.Job) { + return false + } + return true +} +func (this *ReportJobResultRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ReportJobResultRequest) + if !ok { + that2, ok := that.(ReportJobResultRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.JobId != that1.JobId { + return false + } + if this.JobType != that1.JobType { + return false + } + if this.Error != that1.Error { + return false + } + if !this.Result.Equal(that1.Result) { + return false + } + return true +} +func (this *ReportJobResultResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ReportJobResultResponse) + if !ok { + that2, ok := that.(ReportJobResultResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *Job) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&jobqueue.Job{") + s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "Payload: "+fmt.Sprintf("%#v", this.Payload)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DequeueRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&jobqueue.DequeueRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DequeueResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&jobqueue.DequeueResponse{") + if this.Job != nil { + s = append(s, "Job: "+fmt.Sprintf("%#v", this.Job)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ReportJobResultRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&jobqueue.ReportJobResultRequest{") + s = append(s, "JobId: "+fmt.Sprintf("%#v", this.JobId)+",\n") + s = append(s, "JobType: "+fmt.Sprintf("%#v", this.JobType)+",\n") + s = append(s, "Error: "+fmt.Sprintf("%#v", this.Error)+",\n") + if this.Result != nil { + s = append(s, "Result: "+fmt.Sprintf("%#v", this.Result)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ReportJobResultResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&jobqueue.ReportJobResultResponse{") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringQueue(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// JobQueueClient is the client API for JobQueue service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type JobQueueClient interface { + // Dequeue retrieves the next job from the queue + Dequeue(ctx context.Context, in *DequeueRequest, opts ...grpc.CallOption) (*DequeueResponse, error) + // ReportJobResult reports the result of executing a job + ReportJobResult(ctx context.Context, in *ReportJobResultRequest, opts ...grpc.CallOption) (*ReportJobResultResponse, error) +} + +type jobQueueClient struct { + cc *grpc.ClientConn +} + +func NewJobQueueClient(cc *grpc.ClientConn) JobQueueClient { + return &jobQueueClient{cc} +} + +func (c *jobQueueClient) Dequeue(ctx context.Context, in *DequeueRequest, opts ...grpc.CallOption) (*DequeueResponse, error) { + out := new(DequeueResponse) + err := c.cc.Invoke(ctx, "/jobqueue.JobQueue/Dequeue", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *jobQueueClient) ReportJobResult(ctx context.Context, in *ReportJobResultRequest, opts ...grpc.CallOption) (*ReportJobResultResponse, error) { + out := new(ReportJobResultResponse) + err := c.cc.Invoke(ctx, "/jobqueue.JobQueue/ReportJobResult", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// JobQueueServer is the server API for JobQueue service. +type JobQueueServer interface { + // Dequeue retrieves the next job from the queue + Dequeue(context.Context, *DequeueRequest) (*DequeueResponse, error) + // ReportJobResult reports the result of executing a job + ReportJobResult(context.Context, *ReportJobResultRequest) (*ReportJobResultResponse, error) +} + +// UnimplementedJobQueueServer can be embedded to have forward compatible implementations. +type UnimplementedJobQueueServer struct { +} + +func (*UnimplementedJobQueueServer) Dequeue(ctx context.Context, req *DequeueRequest) (*DequeueResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Dequeue not implemented") +} +func (*UnimplementedJobQueueServer) ReportJobResult(ctx context.Context, req *ReportJobResultRequest) (*ReportJobResultResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReportJobResult not implemented") +} + +func RegisterJobQueueServer(s *grpc.Server, srv JobQueueServer) { + s.RegisterService(&_JobQueue_serviceDesc, srv) +} + +func _JobQueue_Dequeue_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DequeueRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(JobQueueServer).Dequeue(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/jobqueue.JobQueue/Dequeue", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(JobQueueServer).Dequeue(ctx, req.(*DequeueRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _JobQueue_ReportJobResult_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReportJobResultRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(JobQueueServer).ReportJobResult(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/jobqueue.JobQueue/ReportJobResult", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(JobQueueServer).ReportJobResult(ctx, req.(*ReportJobResultRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _JobQueue_serviceDesc = grpc.ServiceDesc{ + ServiceName: "jobqueue.JobQueue", + HandlerType: (*JobQueueServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Dequeue", + Handler: _JobQueue_Dequeue_Handler, + }, + { + MethodName: "ReportJobResult", + Handler: _JobQueue_ReportJobResult_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/compactor/jobqueue/queue.proto", +} + +func (m *Job) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Job) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Job) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Payload) > 0 { + i -= len(m.Payload) + copy(dAtA[i:], m.Payload) + i = encodeVarintQueue(dAtA, i, uint64(len(m.Payload))) + i-- + dAtA[i] = 0x1a + } + if m.Type != 0 { + i = encodeVarintQueue(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x10 + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintQueue(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *DequeueRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DequeueRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DequeueRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *DequeueResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DequeueResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DequeueResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Job != nil { + { + size, err := m.Job.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueue(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *ReportJobResultRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReportJobResultRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReportJobResultRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Result != nil { + { + size, err := m.Result.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueue(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + if len(m.Error) > 0 { + i -= len(m.Error) + copy(dAtA[i:], m.Error) + i = encodeVarintQueue(dAtA, i, uint64(len(m.Error))) + i-- + dAtA[i] = 0x1a + } + if m.JobType != 0 { + i = encodeVarintQueue(dAtA, i, uint64(m.JobType)) + i-- + dAtA[i] = 0x10 + } + if len(m.JobId) > 0 { + i -= len(m.JobId) + copy(dAtA[i:], m.JobId) + i = encodeVarintQueue(dAtA, i, uint64(len(m.JobId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *ReportJobResultResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReportJobResultResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReportJobResultResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func encodeVarintQueue(dAtA []byte, offset int, v uint64) int { + offset -= sovQueue(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Job) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovQueue(uint64(l)) + } + if m.Type != 0 { + n += 1 + sovQueue(uint64(m.Type)) + } + l = len(m.Payload) + if l > 0 { + n += 1 + l + sovQueue(uint64(l)) + } + return n +} + +func (m *DequeueRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *DequeueResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Job != nil { + l = m.Job.Size() + n += 1 + l + sovQueue(uint64(l)) + } + return n +} + +func (m *ReportJobResultRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.JobId) + if l > 0 { + n += 1 + l + sovQueue(uint64(l)) + } + if m.JobType != 0 { + n += 1 + sovQueue(uint64(m.JobType)) + } + l = len(m.Error) + if l > 0 { + n += 1 + l + sovQueue(uint64(l)) + } + if m.Result != nil { + l = m.Result.Size() + n += 1 + l + sovQueue(uint64(l)) + } + return n +} + +func (m *ReportJobResultResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovQueue(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozQueue(x uint64) (n int) { + return sovQueue(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Job) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Job{`, + `Id:` + fmt.Sprintf("%v", this.Id) + `,`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `Payload:` + fmt.Sprintf("%v", this.Payload) + `,`, + `}`, + }, "") + return s +} +func (this *DequeueRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DequeueRequest{`, + `}`, + }, "") + return s +} +func (this *DequeueResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DequeueResponse{`, + `Job:` + strings.Replace(this.Job.String(), "Job", "Job", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ReportJobResultRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ReportJobResultRequest{`, + `JobId:` + fmt.Sprintf("%v", this.JobId) + `,`, + `JobType:` + fmt.Sprintf("%v", this.JobType) + `,`, + `Error:` + fmt.Sprintf("%v", this.Error) + `,`, + `Result:` + strings.Replace(fmt.Sprintf("%v", this.Result), "Any", "types.Any", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ReportJobResultResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ReportJobResultResponse{`, + `}`, + }, "") + return s +} +func valueToStringQueue(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Job) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Job: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Job: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= JobType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Payload", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Payload = append(m.Payload[:0], dAtA[iNdEx:postIndex]...) + if m.Payload == nil { + m.Payload = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DequeueRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DequeueRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DequeueRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DequeueResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DequeueResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DequeueResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Job", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Job == nil { + m.Job = &Job{} + } + if err := m.Job.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReportJobResultRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReportJobResultRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReportJobResultRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JobId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.JobId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JobType", wireType) + } + m.JobType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JobType |= JobType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Error = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Result", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueue + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueue + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Result == nil { + m.Result = &types.Any{} + } + if err := m.Result.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReportJobResultResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueue + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReportJobResultResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReportJobResultResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipQueue(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueue + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipQueue(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthQueue + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthQueue + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQueue + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipQueue(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthQueue + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthQueue = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowQueue = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/compactor/jobqueue/queue.proto b/pkg/compactor/jobqueue/queue.proto new file mode 100644 index 0000000000..6d4425be26 --- /dev/null +++ b/pkg/compactor/jobqueue/queue.proto @@ -0,0 +1,48 @@ +syntax = "proto3"; + +package jobqueue; + +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/grafana/loki/pkg/compactor/jobqueue"; + +// Job represents a single job in the queue +message Job { + string id = 1; + JobType type = 2; + bytes payload = 3; // encoded job specific payload +} + +// JobType represents the type of job +enum JobType { + JOB_TYPE_DELETION = 0; + // Add more job types as needed +} + +// DequeueRequest is used to request a job from the queue +message DequeueRequest {} + +// DequeueResponse contains the dequeued job +message DequeueResponse { + Job job = 1; +} + +// ReportJobResultRequest is used to report the result of executing a job +message ReportJobResultRequest { + string job_id = 1; + JobType job_type = 2; + string error = 3; // Empty string indicates success + google.protobuf.Any result = 4; +} + +// ReportJobResultResponse is the response to reporting a job result +message ReportJobResultResponse {} + +// JobQueue provides RPC methods for job queue operations +service JobQueue { + // Dequeue retrieves the next job from the queue + rpc Dequeue(DequeueRequest) returns (DequeueResponse) {} + // ReportJobResult reports the result of executing a job + rpc ReportJobResult(ReportJobResultRequest) returns (ReportJobResultResponse) {} +} diff --git a/pkg/compactor/jobqueue/queue_test.go b/pkg/compactor/jobqueue/queue_test.go new file mode 100644 index 0000000000..eabf809012 --- /dev/null +++ b/pkg/compactor/jobqueue/queue_test.go @@ -0,0 +1,238 @@ +package jobqueue + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// mockBuilder implements the Builder interface for testing +type mockBuilder struct { + jobsToBuild []*Job + buildErr error +} + +func (m *mockBuilder) OnJobResponse(_ *ReportJobResultRequest) {} + +func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *Job) error { + if m.buildErr != nil { + return m.buildErr + } + + for _, job := range m.jobsToBuild { + select { + case <-ctx.Done(): + return ctx.Err() + case jobsChan <- job: + } + } + + // Keep running until context is cancelled + <-ctx.Done() + return ctx.Err() +} + +func TestQueue_RegisterBuilder(t *testing.T) { + q := New() + builder := &mockBuilder{} + + // Register builder successfully + err := q.RegisterBuilder(JOB_TYPE_DELETION, builder) + require.NoError(t, err) + + // Try to register same builder type again + err = q.RegisterBuilder(JOB_TYPE_DELETION, builder) + require.ErrorIs(t, err, ErrBuilderAlreadyRegistered) +} + +func TestQueue_Dequeue(t *testing.T) { + q := New() + + // Create a test job + job := &Job{ + Id: "test-job", + Type: JOB_TYPE_DELETION, + } + + go func() { + // Enqueue the job + q.queue <- job + }() + + // Dequeue the job + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := q.Dequeue(ctx, &DequeueRequest{}) + require.NoError(t, err) + require.Equal(t, job, resp.Job) + + // Verify job is tracked as being processed + q.processingJobsMtx.RLock() + pj, exists := q.processingJobs[job.Id] + q.processingJobsMtx.RUnlock() + require.True(t, exists) + require.Equal(t, job, pj.job) + require.Equal(t, 0, pj.retryCount) +} + +func TestQueue_ReportJobResult(t *testing.T) { + ctx := context.Background() + q := New() + require.NoError(t, q.RegisterBuilder(JOB_TYPE_DELETION, &mockBuilder{})) + + // Create a test job + job := &Job{ + Id: "test-job", + Type: JOB_TYPE_DELETION, + } + + // Add job to processing jobs + q.processingJobsMtx.Lock() + q.processingJobs[job.Id] = &processingJob{ + job: job, + dequeued: time.Now(), + retryCount: 0, + } + q.processingJobsMtx.Unlock() + + // Test successful response + resp, err := q.ReportJobResult(ctx, &ReportJobResultRequest{ + JobId: job.Id, + JobType: job.Type, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + // Verify job is removed from processing jobs + q.processingJobsMtx.RLock() + _, exists := q.processingJobs[job.Id] + q.processingJobsMtx.RUnlock() + require.False(t, exists) + + // Test error response with retry + job.Id = "retry-job" + q.processingJobsMtx.Lock() + q.processingJobs[job.Id] = &processingJob{ + job: job, + dequeued: time.Now(), + retryCount: 0, + } + q.processingJobsMtx.Unlock() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + resp, err = q.ReportJobResult(ctx, &ReportJobResultRequest{ + JobId: job.Id, + JobType: job.Type, + Error: "test error", + }) + require.NoError(t, err) + require.NotNil(t, resp) + }() + + // Verify job is requeued with timeout + select { + case requeuedJob := <-q.queue: + require.Equal(t, job, requeuedJob) + case <-time.After(time.Minute): + t.Fatal("job was not requeued") + } + + wg.Wait() + + // Verify retry count is incremented + q.processingJobsMtx.RLock() + pj, exists := q.processingJobs[job.Id] + q.processingJobsMtx.RUnlock() + require.True(t, exists) + require.Equal(t, 1, pj.retryCount) +} + +func TestQueue_JobTimeout(t *testing.T) { + q := newQueue(50 * time.Millisecond) + q.jobTimeout = 100 * time.Millisecond // Short timeout for testing + + // Create a test job + job := &Job{ + Id: "test-job", + Type: JOB_TYPE_DELETION, + } + + // Add job to processing jobs with old dequeued time + q.processingJobsMtx.Lock() + q.processingJobs[job.Id] = &processingJob{ + job: job, + dequeued: time.Now().Add(-200 * time.Millisecond), + retryCount: 0, + } + q.processingJobsMtx.Unlock() + + // Wait for timeout checker to run + time.Sleep(100 * time.Millisecond) + + // Verify job is requeued + select { + case requeuedJob := <-q.queue: + require.Equal(t, job, requeuedJob) + case <-time.After(time.Second): + t.Fatal("job was not requeued after timeout") + } + + // Verify job is removed from processing jobs + q.processingJobsMtx.RLock() + _, exists := q.processingJobs[job.Id] + q.processingJobsMtx.RUnlock() + require.False(t, exists) +} + +func TestQueue_StartStop(t *testing.T) { + q := New() + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Create a builder that returns an error + builder := &mockBuilder{ + buildErr: errors.New("test error"), + } + + // Register and start the builder + err := q.RegisterBuilder(JOB_TYPE_DELETION, builder) + require.NoError(t, err) + + err = q.Start(ctx) + require.NoError(t, err) + + // Wait for context cancellation + <-ctx.Done() + + // Stop the queue + err = q.Stop() + require.NoError(t, err) +} + +func TestQueue_Close(t *testing.T) { + q := New() + + // Close the queue + q.Close() + + // Verify queue is closed + require.True(t, q.closed.Load()) + + // Verify channel is closed + select { + case _, ok := <-q.queue: + require.False(t, ok) + default: + t.Fatal("queue channel should be closed") + } +} diff --git a/pkg/compactor/retention/retention.go b/pkg/compactor/retention/retention.go index e418109628..5527c42696 100644 --- a/pkg/compactor/retention/retention.go +++ b/pkg/compactor/retention/retention.go @@ -32,7 +32,7 @@ const ( ) type Chunk struct { - ChunkID []byte + ChunkID string From model.Time Through model.Time } @@ -94,7 +94,7 @@ type SeriesIterator interface { } type IndexCleaner interface { - RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error + RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID string) error // CleanupSeries is for cleaning up the series that do have any chunks left in the index. // It would only be called for the series that have all their chunks deleted without adding new ones. CleanupSeries(userID []byte, lbls labels.Labels) error @@ -268,7 +268,7 @@ func markForDelete( // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then // the retention would fail because it would fail to find it in the storage. if filterFunc == nil || c.From >= tableInterval.Start { - if err := marker.Put(c.ChunkID); err != nil { + if err := marker.Put(unsafeGetBytes(c.ChunkID)); err != nil { return err } } @@ -442,9 +442,8 @@ func newChunkRewriter(chunkClient client.Client, tableName string, chunkIndexer // the status of which is set to wroteChunks. func (c *chunkRewriter) rewriteChunk(ctx context.Context, userID []byte, ce Chunk, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) { userIDStr := unsafeGetString(userID) - chunkID := unsafeGetString(ce.ChunkID) - chk, err := chunk.ParseExternalKey(userIDStr, chunkID) + chk, err := chunk.ParseExternalKey(userIDStr, ce.ChunkID) if err != nil { return false, false, err } diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 8e1b50374d..fce7bb6dd8 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -584,7 +584,7 @@ func TestChunkRewriter(t *testing.T) { cr := newChunkRewriter(store.chunkClient, indexTable.name, indexTable) wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), []byte(tt.chunk.UserID), Chunk{ - ChunkID: []byte(getChunkID(tt.chunk.ChunkRef)), + ChunkID: getChunkID(tt.chunk.ChunkRef), From: tt.chunk.From, Through: tt.chunk.Through, }, ExtractIntervalFromTableName(indexTable.name), tt.filterFunc) @@ -684,7 +684,7 @@ func (m *mockExpirationChecker) Expired(_ []byte, chk Chunk, _ labels.Labels, _ time.Sleep(m.delay) m.numExpiryChecks++ - ce := m.chunksExpiry[string(chk.ChunkID)] + ce := m.chunksExpiry[chk.ChunkID] return ce.isExpired, ce.filterFunc } @@ -1184,7 +1184,7 @@ func TestDuplicateSeriesDetection(t *testing.T) { labels: chk.Metric, chunks: []Chunk{ { - ChunkID: []byte(getChunkID(chk.ChunkRef)), + ChunkID: getChunkID(chk.ChunkRef), From: chk.From, Through: chk.Through, }, diff --git a/pkg/compactor/retention/util.go b/pkg/compactor/retention/util.go index 535aa8370d..651715151f 100644 --- a/pkg/compactor/retention/util.go +++ b/pkg/compactor/retention/util.go @@ -16,6 +16,10 @@ func unsafeGetString(buf []byte) string { return *((*string)(unsafe.Pointer(&buf))) // #nosec G103 -- we know the string is not mutated } +func unsafeGetBytes(s string) []byte { + return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated +} + func copyFile(src, dst string) (int64, error) { sourceFileStat, err := os.Stat(src) if err != nil { diff --git a/pkg/compactor/retention/util_test.go b/pkg/compactor/retention/util_test.go index aa98ab5097..0edb4a968f 100644 --- a/pkg/compactor/retention/util_test.go +++ b/pkg/compactor/retention/util_test.go @@ -137,7 +137,7 @@ func (t *table) ForEachSeries(ctx context.Context, callback SeriesCallback) erro chunks := make([]Chunk, 0, len(t.chunks[userID][seriesID])) for _, chk := range t.chunks[userID][seriesID] { chunks = append(chunks, Chunk{ - ChunkID: []byte(getChunkID(chk.ChunkRef)), + ChunkID: getChunkID(chk.ChunkRef), From: chk.From, Through: chk.Through, }) @@ -168,10 +168,10 @@ func (t *table) CleanupSeries(_ []byte, _ labels.Labels) error { return nil } -func (t *table) RemoveChunk(_, _ model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error { +func (t *table) RemoveChunk(_, _ model.Time, userID []byte, lbls labels.Labels, chunkID string) error { seriesID := string(labelsSeriesID(labels.NewBuilder(lbls).Set(labels.MetricName, "logs").Labels())) for i, chk := range t.chunks[string(userID)][seriesID] { - if getChunkID(chk.ChunkRef) == string(chunkID) { + if getChunkID(chk.ChunkRef) == chunkID { t.chunks[string(userID)][seriesID] = append(t.chunks[string(userID)][seriesID][:i], t.chunks[string(userID)][seriesID][i+1:]...) } } diff --git a/pkg/compactor/testutil.go b/pkg/compactor/testutil.go index c60c98fd94..39efb30405 100644 --- a/pkg/compactor/testutil.go +++ b/pkg/compactor/testutil.go @@ -172,7 +172,7 @@ func (c compactedIndex) CleanupSeries(_ []byte, _ labels.Labels) error { return nil } -func (c compactedIndex) RemoveChunk(_, _ model.Time, _ []byte, _ labels.Labels, _ []byte) error { +func (c compactedIndex) RemoveChunk(_, _ model.Time, _ []byte, _ labels.Labels, _ string) error { return nil } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go index 2ba43fe602..8012c106ba 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go @@ -166,7 +166,7 @@ func (c *CompactedIndex) CleanupSeries(userID []byte, lbls labels.Labels) error return c.seriesCleaner.CleanupSeries(userID, lbls) } -func (c *CompactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error { +func (c *CompactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID string) error { if err := c.setupIndexProcessors(); err != nil { return err } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go index 36fedee719..6d176e6513 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go @@ -1,10 +1,10 @@ package compactor import ( - "bytes" "context" "os" "sort" + "strings" "testing" "time" @@ -111,11 +111,11 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { require.NoError(t, err) sort.Slice(expectedChunkEntries, func(i, j int) bool { - return bytes.Compare(expectedChunkEntries[i].ChunkID, expectedChunkEntries[j].ChunkID) < 0 + return strings.Compare(expectedChunkEntries[i].ChunkID, expectedChunkEntries[j].ChunkID) < 0 }) sort.Slice(chunkEntriesFound, func(i, j int) bool { - return bytes.Compare(chunkEntriesFound[i].ChunkID, chunkEntriesFound[j].ChunkID) < 0 + return strings.Compare(chunkEntriesFound[i].ChunkID, chunkEntriesFound[j].ChunkID) < 0 }) require.Equal(t, expectedChunkEntries, chunkEntriesFound) diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go index 250ffd7df8..8ff8e7b7d1 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go @@ -54,7 +54,7 @@ func ForEachSeries(ctx context.Context, bucket *bbolt.Bucket, config config.Peri } current.AppendChunks(retention.Chunk{ - ChunkID: ref.ChunkID, + ChunkID: string(ref.ChunkID), From: ref.From, Through: ref.Through, }) @@ -134,7 +134,7 @@ func (s *seriesCleaner) CleanupSeries(userID []byte, lbls labels.Labels) error { return nil } -func (s *seriesCleaner) RemoveChunk(from, through model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error { +func (s *seriesCleaner) RemoveChunk(from, through model.Time, userID []byte, lbls labels.Labels, chunkID string) error { // We need to add metric name label as well if it is missing since the series ids are calculated including that. if lbls.Get(labels.MetricName) == "" { lbls = append(lbls, labels.Label{ @@ -143,7 +143,7 @@ func (s *seriesCleaner) RemoveChunk(from, through model.Time, userID []byte, lbl }) } - indexEntries, err := s.schema.GetChunkWriteEntries(from, through, string(userID), logMetricName, lbls, string(chunkID)) + indexEntries, err := s.schema.GetChunkWriteEntries(from, through, string(userID), logMetricName, lbls, chunkID) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go index d97616af0f..7b050d060d 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go @@ -226,7 +226,7 @@ func labelsString(ls labels.Labels) string { func retentionChunkFromChunk(s config.SchemaConfig, c chunk.Chunk) retention.Chunk { return retention.Chunk{ - ChunkID: []byte(s.ExternalKey(c.ChunkRef)), + ChunkID: s.ExternalKey(c.ChunkRef), From: c.From, Through: c.Through, } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index e2b933c121..e14c56d48e 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -313,7 +313,7 @@ func (c *compactedIndex) ForEachSeries(ctx context.Context, callback retention.S logprotoChunkRef.Checksum = chk.Checksum series.AppendChunks(retention.Chunk{ - ChunkID: getUnsafeBytes(schemaCfg.ExternalKey(logprotoChunkRef)), + ChunkID: schemaCfg.ExternalKey(logprotoChunkRef), From: logprotoChunkRef.From, Through: logprotoChunkRef.Through, }) @@ -367,8 +367,8 @@ func (c *compactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error { return nil } -func (c *compactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error { - chk, err := chunk.ParseExternalKey(string(userID), string(chunkID)) +func (c *compactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID string) error { + chk, err := chunk.ParseExternalKey(string(userID), chunkID) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index f5f8ff27e9..762ee0e0b1 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -630,7 +630,7 @@ func chunkMetasToRetentionChunk(schemaCfg config.SchemaConfig, userID string, lb chunkEntries := make([]retention.Chunk, 0, len(chunkMetas)) for _, chunkMeta := range chunkMetas { chunkEntries = append(chunkEntries, retention.Chunk{ - ChunkID: []byte(schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls))), + ChunkID: schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls)), From: chunkMeta.From(), Through: chunkMeta.Through(), }) diff --git a/pkg/tool/audit/audit.go b/pkg/tool/audit/audit.go index b155f169a1..aad18ab099 100644 --- a/pkg/tool/audit/audit.go +++ b/pkg/tool/audit/audit.go @@ -105,10 +105,10 @@ func ValidateCompactedIndex(ctx context.Context, objClient client.ObjectClient, bar.Add(1) // nolint:errcheck g.Go(func() error { for _, c := range s.Chunks() { - exists, err := CheckChunkExistance(string(c.ChunkID), objClient) + exists, err := CheckChunkExistance(c.ChunkID, objClient) if err != nil || !exists { missingChunks.Add(1) - logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(c.ChunkID)) + logger.Log("msg", "chunk is missing", "err", err, "chunk_id", c.ChunkID) return nil } foundChunks.Add(1) diff --git a/pkg/tool/audit/audit_test.go b/pkg/tool/audit/audit_test.go index df2bafc82b..6d4eb75396 100644 --- a/pkg/tool/audit/audit_test.go +++ b/pkg/tool/audit/audit_test.go @@ -58,11 +58,11 @@ func TestAuditIndex(t *testing.T) { objClient := testObjClient{} compactedIdx := testCompactedIdx{ chunks: []retention.Chunk{ - {ChunkID: []byte("found-1")}, - {ChunkID: []byte("found-2")}, - {ChunkID: []byte("found-3")}, - {ChunkID: []byte("found-4")}, - {ChunkID: []byte("missing-1")}, + {ChunkID: "found-1"}, + {ChunkID: "found-2"}, + {ChunkID: "found-3"}, + {ChunkID: "found-4"}, + {ChunkID: "missing-1"}, }, } logger := log.NewNopLogger() diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go index 0d06908ad1..caab1ae66f 100644 --- a/tools/tsdb/tsdb-map/main.go +++ b/tools/tsdb/tsdb-map/main.go @@ -82,7 +82,7 @@ func main() { chunkMetas := make([]index.ChunkMeta, 0, len(s.Chunks())) for _, chunk := range s.Chunks() { chunkMetas = append(chunkMetas, index.ChunkMeta{ - Checksum: extractChecksumFromChunkID(chunk.ChunkID), + Checksum: extractChecksumFromChunkID([]byte(chunk.ChunkID)), MinTime: int64(chunk.From), MaxTime: int64(chunk.Through), KB: ((3 << 20) / 4) / 1024, // guess: 0.75mb, 1/2 of the max size, rounded to KB