diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 0c473692d5..cc498d2ff7 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -295,7 +295,7 @@ func (c *Compactor) init( c.tablesManager = newTablesManager(c.cfg, c.storeContainers, c.indexCompactors, c.schemaConfig, c.expirationChecker, c.metrics) if c.cfg.RetentionEnabled { - if err := c.deleteRequestsManager.Init(c.tablesManager); err != nil { + if err := c.deleteRequestsManager.Init(c.tablesManager, r); err != nil { return err } } diff --git a/pkg/compactor/deletion/delete_request.go b/pkg/compactor/deletion/delete_request.go index 647fed5593..2c070ccf3d 100644 --- a/pkg/compactor/deletion/delete_request.go +++ b/pkg/compactor/deletion/delete_request.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -33,8 +34,8 @@ type DeleteRequest struct { logSelectorExpr syntax.LogSelectorExpr `json:"-"` timeInterval *timeInterval `json:"-"` - Metrics *deleteRequestsManagerMetrics `json:"-"` - DeletedLines int32 `json:"-"` + TotalLinesDeletedMetric *prometheus.CounterVec `json:"-"` + DeletedLines int32 `json:"-"` } func (d *DeleteRequest) SetQuery(logQL string) error { @@ -89,7 +90,7 @@ func (d *DeleteRequest) FilterFunction(lbls labels.Labels) (filter.Func, error) result, _, skip := f(0, s, structuredMetadata) if len(result) != 0 || skip { - d.Metrics.deletedLinesTotal.WithLabelValues(d.UserID).Inc() + d.TotalLinesDeletedMetric.WithLabelValues(d.UserID).Inc() d.DeletedLines++ return true } diff --git a/pkg/compactor/deletion/delete_request_batch.go b/pkg/compactor/deletion/delete_request_batch.go index 1a0e607b48..156a01f291 100644 --- a/pkg/compactor/deletion/delete_request_batch.go +++ b/pkg/compactor/deletion/delete_request_batch.go @@ -48,7 +48,7 @@ func (b *deleteRequestBatch) userIDs() []string { // addDeleteRequest add a requests to the batch func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) { - dr.Metrics = b.metrics + dr.TotalLinesDeletedMetric = b.metrics.deletedLinesTotal ur, ok := b.deleteRequestsToProcess[dr.UserID] if !ok { ur = &userDeleteRequests{ @@ -173,3 +173,12 @@ func (b *deleteRequestBatch) getAllRequests() []*DeleteRequest { return requests } + +func (b *deleteRequestBatch) getDeletionIntervalForUser(userID string) model.Interval { + userRequests, ok := b.deleteRequestsToProcess[userID] + if !ok { + return model.Interval{} + } + + return userRequests.requestsInterval +} diff --git a/pkg/compactor/deletion/delete_request_test.go b/pkg/compactor/deletion/delete_request_test.go index a19818a180..cbecf7a6a6 100644 --- a/pkg/compactor/deletion/delete_request_test.go +++ b/pkg/compactor/deletion/delete_request_test.go @@ -270,7 +270,7 @@ func TestDeleteRequest_GetChunkFilter(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query)) - tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil) + tc.deleteRequest.TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deletedLinesTotal isExpired, filterFunc := tc.deleteRequest.GetChunkFilter([]byte(user1), mustParseLabel(lbl), chunkEntry) require.Equal(t, tc.expectedResp.isDeleted, isExpired) if tc.expectedResp.expectedFilter == nil { @@ -310,11 +310,11 @@ func mustParseLabel(input string) labels.Labels { func TestDeleteRequest_FilterFunction(t *testing.T) { t.Run("one line matching with line filter", func(t *testing.T) { dr := DeleteRequest{ - Query: `{foo="bar"} |= "some"`, - DeletedLines: 0, - Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()), - StartTime: 0, - EndTime: math.MaxInt64, + Query: `{foo="bar"} |= "some"`, + DeletedLines: 0, + TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal, + StartTime: 0, + EndTime: math.MaxInt64, } lblStr := lblFooBar @@ -328,16 +328,16 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f(time.Now(), "", labels.EmptyLabels())) require.False(t, f(time.Now(), "other line", labels.EmptyLabels())) require.Equal(t, int32(1), dr.DeletedLines) - require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal)) + require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric)) }) t.Run("one line matching with structured metadata filter", func(t *testing.T) { dr := DeleteRequest{ - Query: `{foo="bar"} | ping="pong"`, - DeletedLines: 0, - Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()), - StartTime: 0, - EndTime: math.MaxInt64, + Query: `{foo="bar"} | ping="pong"`, + DeletedLines: 0, + TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal, + StartTime: 0, + EndTime: math.MaxInt64, } lblStr := lblFooBar @@ -351,16 +351,16 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f(time.Now(), "", labels.EmptyLabels())) require.False(t, f(time.Now(), "some line", labels.EmptyLabels())) require.Equal(t, int32(1), dr.DeletedLines) - require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal)) + require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric)) }) t.Run("one line matching with line and structured metadata filter", func(t *testing.T) { dr := DeleteRequest{ - Query: `{foo="bar"} | ping="pong" |= "some"`, - DeletedLines: 0, - Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()), - StartTime: 0, - EndTime: math.MaxInt64, + Query: `{foo="bar"} | ping="pong" |= "some"`, + DeletedLines: 0, + TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal, + StartTime: 0, + EndTime: math.MaxInt64, } lblStr := lblFooBar @@ -375,15 +375,15 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f(time.Now(), "some line", labels.EmptyLabels())) require.False(t, f(time.Now(), "other line", labels.FromStrings(lblPing, lblPong))) require.Equal(t, int32(1), dr.DeletedLines) - require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal)) + require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric)) }) t.Run("labels not matching", func(t *testing.T) { dr := DeleteRequest{ - Query: `{foo="bar"} |= "some"`, - DeletedLines: 0, - Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()), - UserID: "tenant1", + Query: `{foo="bar"} |= "some"`, + DeletedLines: 0, + TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal, + UserID: "tenant1", } lblStr := `{foo2="buzz"}` @@ -398,17 +398,17 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f(time.Time{}, "some line", labels.EmptyLabels())) require.Equal(t, int32(0), dr.DeletedLines) // testutil.ToFloat64 panics when there are 0 metrics - require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) }) + require.Panics(t, func() { testutil.ToFloat64(dr.TotalLinesDeletedMetric) }) }) t.Run("no line filter", func(t *testing.T) { now := model.Now() dr := DeleteRequest{ - Query: `{namespace="default"}`, - DeletedLines: 0, - Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()), - StartTime: now.Add(-time.Hour), - EndTime: now, + Query: `{namespace="default"}`, + DeletedLines: 0, + TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal, + StartTime: now.Add(-time.Hour), + EndTime: now, } lblStr := `{namespace="default"}` @@ -425,7 +425,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.Equal(t, int32(0), dr.DeletedLines) // testutil.ToFloat64 panics when there are 0 metrics - require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) }) + require.Panics(t, func() { testutil.ToFloat64(dr.TotalLinesDeletedMetric) }) }) } diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index 7f802eef36..2b193a3fc1 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -101,7 +101,7 @@ func NewDeleteRequestsManager( return dm, nil } -func (d *DeleteRequestsManager) Init(tablesManager TablesManager) error { +func (d *DeleteRequestsManager) Init(tablesManager TablesManager, registerer prometheus.Registerer) error { d.tablesManager = tablesManager if d.HSModeEnabled { @@ -109,7 +109,7 @@ func (d *DeleteRequestsManager) Init(tablesManager TablesManager) error { for _, req := range requests { d.markRequestAsProcessed(req) } - }) + }, registerer) } var err error @@ -231,6 +231,10 @@ func (d *DeleteRequestsManager) buildDeletionManifest(ctx context.Context) error return err } + if iterator == nil { + continue + } + if err := iterator.ForEachSeries(ctx, func(series retention.Series) (err error) { return deletionManifestBuilder.AddSeries(ctx, tableName, series) }); err != nil { diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index 604386ff18..fd0054630c 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -1018,7 +1018,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { deletionMode: tc.deletionMode.String(), }}, false, nil, nil) require.NoError(t, err) - require.NoError(t, mgr.Init(nil)) + require.NoError(t, mgr.Init(nil, nil)) mgr.MarkPhaseStarted() require.NotNil(t, mgr.currentBatch) @@ -1105,7 +1105,7 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { for _, tc := range tt { mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil) require.NoError(t, err) - require.NoError(t, mgr.Init(nil)) + require.NoError(t, mgr.Init(nil, nil)) mgr.MarkPhaseStarted() require.NotNil(t, mgr.currentBatch) @@ -1292,7 +1292,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil) require.NoError(t, err) - require.NoError(t, mgr.Init(nil)) + require.NoError(t, mgr.Init(nil, nil)) wg := sync.WaitGroup{} mgrCtx, mgrCtxCancel := context.WithCancel(context.Background()) @@ -1320,7 +1320,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil) require.NoError(t, err) - require.NoError(t, mgr.Init(nil)) + require.NoError(t, mgr.Init(nil, nil)) require.Equal(t, storedSeriesProgress, mgr.processedSeries) mgr.MarkPhaseStarted() require.NotNil(t, mgr.currentBatch) @@ -1345,7 +1345,7 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) { mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil) require.NoError(t, err) - require.NoError(t, mgr.Init(nil)) + require.NoError(t, mgr.Init(nil, nil)) mgr.MarkPhaseStarted() require.NotNil(t, mgr.currentBatch) diff --git a/pkg/compactor/deletion/deletion_manifest_builder.go b/pkg/compactor/deletion/deletion_manifest_builder.go index 816899ea01..099f60042e 100644 --- a/pkg/compactor/deletion/deletion_manifest_builder.go +++ b/pkg/compactor/deletion/deletion_manifest_builder.go @@ -12,6 +12,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/compactor/retention" "github.com/grafana/loki/v3/pkg/storage/chunk/client" @@ -67,6 +69,7 @@ type deletionManifestBuilder struct { currentTableName string allUserRequests []*DeleteRequest + deletionInterval model.Interval creationTime time.Time segmentsCount int overallChunksCount int @@ -98,10 +101,35 @@ func newDeletionManifestBuilder(deletionManifestStoreClient client.ObjectClient, return builder, nil } +func (d *deletionManifestBuilder) canSkipSeries(userID []byte, lbls labels.Labels) (bool, error) { + userIDStr := unsafeGetString(userID) + + userRequests := d.deleteRequestBatch.getAllRequestsForUser(userIDStr) + if len(userRequests) == 0 { + return true, fmt.Errorf("no requests loaded for user: %s", userIDStr) + } + + for _, deleteRequest := range d.deleteRequestBatch.getAllRequestsForUser(userIDStr) { + // if the delete request touches the series, do not skip it + if labels.Selector(deleteRequest.matchers).Matches(lbls) { + return false, nil + } + } + + return true, nil +} + // AddSeries adds a series and its chunks to the current segment. // It flushes the current segment if the user ID or table name changes. // It also ensures that the current segment does not exceed the maximum number of chunks. func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName string, series retention.Series) error { + canSkip, err := d.canSkipSeries(series.UserID(), series.Labels()) + if err != nil { + return err + } + if canSkip { + return nil + } userIDStr := unsafeGetString(series.UserID()) currentLabels := series.Labels().String() @@ -115,13 +143,17 @@ func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName strin d.currentUserID = string(series.UserID()) d.currentTableName = tableName d.allUserRequests = d.deleteRequestBatch.getAllRequestsForUser(userIDStr) - if len(d.allUserRequests) == 0 { - return fmt.Errorf("no requests loaded for user: %s", userIDStr) - } + d.deletionInterval = d.deleteRequestBatch.getDeletionIntervalForUser(userIDStr) } var chunksGroupIdentifier uint64 for _, chk := range series.Chunks() { + if !intervalsOverlap(d.deletionInterval, model.Interval{ + Start: chk.From, + End: chk.Through, + }) { + continue + } if d.currentSegmentChunksCount >= maxChunksPerSegment { if err := d.flushCurrentBatch(ctx); err != nil { return err @@ -185,10 +217,6 @@ func (d *deletionManifestBuilder) Finish(ctx context.Context) error { return err } - if d.overallChunksCount == 0 { - return ErrNoChunksSelectedForDeletion - } - level.Debug(d.logger).Log("msg", "uploading manifest file after finishing building deletion manifest", "total_segments", d.segmentsCount, "total_chunks", d.overallChunksCount, @@ -216,6 +244,9 @@ func (d *deletionManifestBuilder) Finish(ctx context.Context) error { } func (d *deletionManifestBuilder) flushCurrentBatch(ctx context.Context) error { + if d.currentSegmentChunksCount == 0 { + return nil + } level.Debug(d.logger).Log("msg", "flushing segment", "segment_num", d.segmentsCount-1, "chunks_count", d.currentSegmentChunksCount, @@ -314,7 +345,7 @@ func cleanupInvalidManifests(ctx context.Context, deletionManifestStoreClient cl // manifest without manifest.json is considered invalid manifestPath := path.Join(string(commonPrefix), manifestFileName) - exists, err := deletionManifestStoreClient.ObjectExists(ctx, manifestPath) + exists, err := objectExists(ctx, deletionManifestStoreClient, manifestPath) if err != nil { return err } @@ -346,3 +377,17 @@ func cleanupInvalidManifests(ctx context.Context, deletionManifestStoreClient cl return firstErr } + +// objectExists checks if an object exists in storage with the given key. +// We can't use ObjectClient.ObjectExists method due to a bug in the GCS object client implementation of Thanos. +// (Sandeep): I will fix the bug upstream and remove this once we have the fix merged. +func objectExists(ctx context.Context, objectClient client.ObjectClient, objectPath string) (bool, error) { + _, err := objectClient.GetAttributes(ctx, objectPath) + if err == nil { + return true, nil + } else if objectClient.IsObjectNotFoundErr(err) { + return false, nil + } + + return false, err +} diff --git a/pkg/compactor/deletion/deletion_manifest_builder_test.go b/pkg/compactor/deletion/deletion_manifest_builder_test.go index 40607c22f6..5cf91b4381 100644 --- a/pkg/compactor/deletion/deletion_manifest_builder_test.go +++ b/pkg/compactor/deletion/deletion_manifest_builder_test.go @@ -589,7 +589,7 @@ func TestDeletionManifestBuilder(t *testing.T) { require.NoError(t, err) // Create delete request batch - batch := newDeleteRequestBatch(nil) + batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) for _, req := range tc.deleteRequests { batch.addDeleteRequest(&req) } @@ -662,7 +662,7 @@ func TestDeletionManifestBuilder_Errors(t *testing.T) { require.NoError(t, err) // Create delete request batch - batch := newDeleteRequestBatch(nil) + batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) batch.addDeleteRequest(&DeleteRequest{ UserID: user1, RequestID: req1, @@ -683,7 +683,7 @@ func TestDeletionManifestBuilder_Errors(t *testing.T) { require.EqualError(t, err, fmt.Sprintf("no requests loaded for user: %s", user2)) err = builder.Finish(ctx) - require.EqualError(t, err, ErrNoChunksSelectedForDeletion.Error()) + require.NoError(t, err) } func TestCleanupInvalidManifest(t *testing.T) { @@ -695,7 +695,7 @@ func TestCleanupInvalidManifest(t *testing.T) { require.NoError(t, err) // Create delete request batch - batch := newDeleteRequestBatch(nil) + batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) batch.addDeleteRequest(&DeleteRequest{ UserID: user1, RequestID: req1, diff --git a/pkg/compactor/deletion/job_builder.go b/pkg/compactor/deletion/job_builder.go index d2c2c452d3..aa7f7a91ef 100644 --- a/pkg/compactor/deletion/job_builder.go +++ b/pkg/compactor/deletion/job_builder.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/compactor/client/grpc" "github.com/grafana/loki/v3/pkg/storage/chunk/client" @@ -21,6 +22,14 @@ import ( const ( maxChunksPerJob = 1000 storageUpdatesFilenameSuffix = `-storage-updates.json` + + processManifestStageBuildJobs = "build_jobs" + processManifestStageApplyStorageUpdates = "apply_storage_updates" + processManifestStageCleanupManifest = "cleanup_manifest" + + storageUpdateTypeDeleteChunk = "delete_chunk" + storageUpdateTypeDeIndexChunk = "de_index_chunk" + storageUpdateTypeIndexChunk = "index_chunk" ) type StorageUpdatesIterator interface { @@ -61,9 +70,15 @@ type JobBuilder struct { currentManifestMtx sync.RWMutex currSegmentStorageUpdates *storageUpdatesCollection + metrics *jobBuilderMetrics } -func NewJobBuilder(deletionManifestStoreClient client.ObjectClient, applyStorageUpdatesFunc ApplyStorageUpdatesFunc, markRequestsAsProcessedFunc markRequestsAsProcessedFunc) *JobBuilder { +func NewJobBuilder( + deletionManifestStoreClient client.ObjectClient, + applyStorageUpdatesFunc ApplyStorageUpdatesFunc, + markRequestsAsProcessedFunc markRequestsAsProcessedFunc, + r prometheus.Registerer, +) *JobBuilder { return &JobBuilder{ deletionManifestStoreClient: deletionManifestStoreClient, applyStorageUpdatesFunc: applyStorageUpdatesFunc, @@ -71,6 +86,7 @@ func NewJobBuilder(deletionManifestStoreClient client.ObjectClient, applyStorage currSegmentStorageUpdates: &storageUpdatesCollection{ StorageUpdates: map[string]*storageUpdates{}, }, + metrics: newJobBuilderMetrics(r), } } @@ -81,7 +97,6 @@ func (b *JobBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *grpc.Job) { for { if err := b.buildJobs(ctx, jobsChan); err != nil { - // ToDo(Sandeep): Add a metric for tracking failures in building jobs level.Error(util_log.Logger).Log("msg", "error building jobs", "err", err) } @@ -102,24 +117,35 @@ func (b *JobBuilder) buildJobs(ctx context.Context, jobsChan chan<- *grpc.Job) e return err } + if len(manifests) == 0 { + return nil + } + + b.metrics.numManifestsLeftToProcess.Set(float64(len(manifests))) + // Process each manifest for _, manifestPath := range manifests { manifest, err := b.readManifest(ctx, manifestPath) if err != nil { + b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageBuildJobs).Inc() return err } if err := b.processManifest(ctx, manifest, manifestPath, jobsChan); err != nil { + b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageBuildJobs).Inc() return err } if err := b.applyStorageUpdates(ctx, manifest, manifestPath); err != nil { + b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageApplyStorageUpdates).Inc() return err } if err := b.cleanupManifest(ctx, manifest, manifestPath); err != nil { + b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageCleanupManifest).Inc() return err } + b.metrics.numManifestsLeftToProcess.Dec() } return nil @@ -137,6 +163,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma cancel: cancel, } b.currentManifestMtx.Unlock() + b.metrics.numSegmentsLeftToProcess.Set(float64(manifest.SegmentsCount)) // Process segments sequentially for segmentNum := 0; ctx.Err() == nil && segmentNum < manifest.SegmentsCount; segmentNum++ { @@ -151,6 +178,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma return err } if !manifestExists { + b.metrics.numSegmentsLeftToProcess.Dec() level.Info(util_log.Logger).Log("msg", "manifest does not exist(likely processed already), skipping", "manifest", manifestPath) continue } @@ -199,6 +227,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma level.Info(util_log.Logger).Log("msg", "finished segment processing", "manifest", manifestPath, "segment", segmentNum) + b.metrics.numSegmentsLeftToProcess.Dec() } level.Info(util_log.Logger).Log("msg", "finished manifest processing", "manifest", manifestPath) @@ -251,7 +280,7 @@ func (b *JobBuilder) listManifests(ctx context.Context) ([]string, error) { // Check if manifest.json exists in this directory manifestPath := path.Join(string(commonPrefix), manifestFileName) - exists, err := b.deletionManifestStoreClient.ObjectExists(ctx, manifestPath) + exists, err := objectExists(context.Background(), b.deletionManifestStoreClient, manifestPath) if err != nil { return nil, err } @@ -362,7 +391,7 @@ func (b *JobBuilder) OnJobResponse(response *grpc.JobResult) error { // applyStorageUpdates applies all the storage updates accumulated while processing of the given manifest func (b *JobBuilder) applyStorageUpdates(ctx context.Context, manifest *manifest, manifestPath string) error { - storageUpdatesIterator := newStorageUpdatesIterator(ctx, manifestPath, manifest, b.deletionManifestStoreClient) + storageUpdatesIterator := newStorageUpdatesIterator(ctx, manifestPath, manifest, b.deletionManifestStoreClient, b.metrics.storageUpdatesAppliedTotal) return b.applyStorageUpdatesFunc(ctx, storageUpdatesIterator) } @@ -410,9 +439,9 @@ func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segme } type storageUpdates struct { - ChunksToDelete []string // List of chunks to be deleted from object storage and removed from the index of the current table - ChunksToDeIndex []string // List of chunks only to be removed from the index of the current table - ChunksToIndex []chunk // List of chunks to be indexed in the current table + ChunksToDelete []string `json:"chunks_to_delete,omitempty"` // List of chunks to be deleted from object storage and removed from the index of the current table + ChunksToDeIndex []string `json:"chunks_to_de_index,omitempty"` // List of chunks only to be removed from the index of the current table + ChunksToIndex []chunk `json:"chunks_to_index,omitempty"` // List of chunks to be indexed in the current table } // storageUpdatesCollection collects updates to be made to the storage for a single segment @@ -433,6 +462,10 @@ func (i *storageUpdatesCollection) reset(tableName, userID string) { } func (i *storageUpdatesCollection) addUpdates(labels string, result storageUpdates) { + if len(result.ChunksToIndex)+len(result.ChunksToDeIndex)+len(result.ChunksToDelete) == 0 { + return + } + i.mtx.Lock() defer i.mtx.Unlock() @@ -460,19 +493,21 @@ type storageUpdatesIterator struct { manifestPath string manifest *manifest deletionManifestStoreClient client.ObjectClient + storageUpdatesTotal *prometheus.CounterVec currSegmentNum int currUpdatesCollection *storageUpdatesCollection err error } -func newStorageUpdatesIterator(ctx context.Context, manifestPath string, manifest *manifest, deletionManifestStoreClient client.ObjectClient) *storageUpdatesIterator { +func newStorageUpdatesIterator(ctx context.Context, manifestPath string, manifest *manifest, deletionManifestStoreClient client.ObjectClient, storageUpdatesTotal *prometheus.CounterVec) *storageUpdatesIterator { return &storageUpdatesIterator{ ctx: ctx, manifestPath: manifestPath, manifest: manifest, deletionManifestStoreClient: deletionManifestStoreClient, currSegmentNum: -1, + storageUpdatesTotal: storageUpdatesTotal, } } @@ -542,6 +577,9 @@ func (i *storageUpdatesIterator) ForEachSeries(callback func(labels string, chun if err := callback(labels, updates.ChunksToDelete, updates.ChunksToDeIndex, chunksToIndex); err != nil { return err } + i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeDeleteChunk).Add(float64(len(updates.ChunksToDelete))) + i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeDeIndexChunk).Add(float64(len(updates.ChunksToDeIndex))) + i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeIndexChunk).Add(float64(len(updates.ChunksToIndex))) } return nil diff --git a/pkg/compactor/deletion/job_builder_test.go b/pkg/compactor/deletion/job_builder_test.go index 30ec88d020..227f71d0b5 100644 --- a/pkg/compactor/deletion/job_builder_test.go +++ b/pkg/compactor/deletion/job_builder_test.go @@ -67,7 +67,7 @@ func TestJobBuilder_buildJobs(t *testing.T) { { name: "one manifest in storage with less than maxChunksPerJob", setupManifest: func(client client.ObjectClient) []DeleteRequest { - deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) requestsToAdd := []DeleteRequest{ { RequestID: req1, @@ -125,7 +125,7 @@ func TestJobBuilder_buildJobs(t *testing.T) { { name: "one manifest in storage with more than maxChunksPerJob", setupManifest: func(client client.ObjectClient) []DeleteRequest { - deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) requestsToAdd := []DeleteRequest{ { RequestID: req1, @@ -199,7 +199,7 @@ func TestJobBuilder_buildJobs(t *testing.T) { { name: "one manifest in storage with multiple groups", setupManifest: func(client client.ObjectClient) []DeleteRequest { - deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) requestsToAdd := []DeleteRequest{ { UserID: user1, @@ -287,7 +287,7 @@ func TestJobBuilder_buildJobs(t *testing.T) { { name: "one manifest in storage with multiple segments due to multiple tables", setupManifest: func(client client.ObjectClient) []DeleteRequest { - deleteRequestBatch := newDeleteRequestBatch(nil) + deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) requestsToAdd := []DeleteRequest{ { RequestID: req1, @@ -392,7 +392,7 @@ func TestJobBuilder_buildJobs(t *testing.T) { return iterator.Err() }, func(requests []DeleteRequest) { requestsMarkedAsProcessed = requests - }) + }, nil) jobsChan := make(chan *grpc.Job) var jobsBuilt []grpc.Job @@ -470,7 +470,7 @@ func TestJobBuilder_ProcessManifest(t *testing.T) { builder := NewJobBuilder(objectClient, func(_ context.Context, _ StorageUpdatesIterator) error { return nil - }, func(_ []DeleteRequest) {}) + }, func(_ []DeleteRequest) {}, nil) // Create a test manifest manifest := &manifest{ diff --git a/pkg/compactor/deletion/job_runner.go b/pkg/compactor/deletion/job_runner.go index fdf5d89ff4..93d3f22c9c 100644 --- a/pkg/compactor/deletion/job_runner.go +++ b/pkg/compactor/deletion/job_runner.go @@ -102,8 +102,7 @@ func (jr *JobRunner) Run(ctx context.Context, job *grpc.Job) ([]byte, error) { if !req.logSelectorExpr.HasFilter() { return nil, errors.New("deletion query does not contain filter") } - // ToDo(Sandeep): set it to proper metrics - req.Metrics = newDeleteRequestsManagerMetrics(nil) + req.TotalLinesDeletedMetric = jr.metrics.deletedLinesTotal } tableInterval := retention.ExtractIntervalFromTableName(deletionJob.TableName) @@ -218,6 +217,7 @@ func (jr *JobRunner) Run(ctx context.Context, job *grpc.Job) ([]byte, error) { return nil, err } + jr.metrics.chunksProcessedTotal.Add(float64(len(deletionJob.ChunkIDs))) jobResultJSON, err := json.Marshal(updates) if err != nil { return nil, err diff --git a/pkg/compactor/deletion/job_runner_test.go b/pkg/compactor/deletion/job_runner_test.go index 29ebfa68f0..013a14960e 100644 --- a/pkg/compactor/deletion/job_runner_test.go +++ b/pkg/compactor/deletion/job_runner_test.go @@ -272,7 +272,7 @@ func TestJobRunner_Run(t *testing.T) { // Ensure Metrics is set for each DeleteRequest for i := range tc.deleteRequests { - tc.deleteRequests[i].Metrics = newDeleteRequestsManagerMetrics(nil) + tc.deleteRequests[i].TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deletedLinesTotal } // Create job runner @@ -421,7 +421,7 @@ func TestJobRunner_Run_ConcurrentChunkProcessing(t *testing.T) { // Ensure Metrics is set for each DeleteRequest for i := range deleteRequests { - deleteRequests[i].Metrics = newDeleteRequestsManagerMetrics(nil) + deleteRequests[i].TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deleteRequestsProcessedTotal } // Create job runner with chunk processing concurrency of 2 diff --git a/pkg/compactor/deletion/metrics.go b/pkg/compactor/deletion/metrics.go index ea6ce8716a..8cc3a94298 100644 --- a/pkg/compactor/deletion/metrics.go +++ b/pkg/compactor/deletion/metrics.go @@ -119,6 +119,7 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan type deletionJobRunnerMetrics struct { chunksProcessedTotal prometheus.Counter + deletedLinesTotal *prometheus.CounterVec } func newDeletionJobRunnerMetrics(r prometheus.Registerer) *deletionJobRunnerMetrics { @@ -129,6 +130,45 @@ func newDeletionJobRunnerMetrics(r prometheus.Registerer) *deletionJobRunnerMetr Name: "compactor_deletion_job_runner_chunks_processed_total", Help: "Number of chunks processed", }) + m.deletedLinesTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "compactor_deletion_job_runner_deleted_lines_total", + Help: "Number of deleted lines per user", + }, []string{"user"}) + + return &m +} + +type jobBuilderMetrics struct { + numSegmentsLeftToProcess prometheus.Gauge + numManifestsLeftToProcess prometheus.Gauge + processManifestFailuresTotal *prometheus.CounterVec + storageUpdatesAppliedTotal *prometheus.CounterVec +} + +func newJobBuilderMetrics(r prometheus.Registerer) *jobBuilderMetrics { + m := jobBuilderMetrics{} + + m.numSegmentsLeftToProcess = promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Name: "compactor_job_builder_num_segments_left_to_process", + Help: "Number of segments left to process to finish processing the current segment", + }) + m.numManifestsLeftToProcess = promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: constants.Loki, + Name: "compactor_job_builder_num_manifests_left_to_process", + Help: "Number of manifests left to process", + }) + m.processManifestFailuresTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "compactor_process_manifest_failures_total", + Help: "Number of times compactor failed to process manifest at various stages", + }, []string{"stage"}) + m.storageUpdatesAppliedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "compactor_deletion_storage_updates_applied_total", + Help: "Number of storage updates made by type after processing of delete manifests", + }, []string{"type"}) return &m } diff --git a/pkg/compactor/jobqueue/metrics.go b/pkg/compactor/jobqueue/metrics.go index 9f1eb3637a..0360699752 100644 --- a/pkg/compactor/jobqueue/metrics.go +++ b/pkg/compactor/jobqueue/metrics.go @@ -58,7 +58,7 @@ func newWorkerMetrics(r prometheus.Registerer, allWorkersConnectedToCompactor fu m.jobsProcessed = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, - Name: "compactor_worker_job_processed_attempts", + Name: "compactor_worker_jobs_processed_total", Help: "Number of jobs processed by worker with their processing status", }, []string{"status"}) m.workerConnectedToCompactor = promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ diff --git a/pkg/compactor/jobqueue/queue.go b/pkg/compactor/jobqueue/queue.go index bc2a9798a0..60b83647a9 100644 --- a/pkg/compactor/jobqueue/queue.go +++ b/pkg/compactor/jobqueue/queue.go @@ -141,6 +141,8 @@ func (q *Queue) retryFailedJobs() { case <-q.stop: return case <-ticker.C: + var jobsToRetry []string + q.processingJobsMtx.Lock() now := time.Now() for jobID, pj := range q.processingJobs { @@ -153,31 +155,39 @@ func (q *Queue) retryFailedJobs() { } timeout := q.builders[pj.job.Type].jobTimeout if pj.lastAttemptFailed || now.Sub(pj.dequeued) > timeout { - // Requeue the job - select { - case <-q.stop: - return - case q.queue <- pj.job: - reason := "timeout" - if pj.lastAttemptFailed { - reason = "failed" - } - q.metrics.jobRetries.WithLabelValues(reason).Inc() - level.Warn(util_log.Logger).Log( - "msg", "requeued job", - "job_id", jobID, - "job_type", pj.job.Type, - "timeout", timeout, - "reason", reason, - ) - // reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing. - q.processingJobs[jobID].dequeued = time.Now() - q.processingJobs[jobID].lastAttemptFailed = false - q.processingJobs[jobID].attemptsLeft-- - } + jobsToRetry = append(jobsToRetry, jobID) } } q.processingJobsMtx.Unlock() + + for _, jobID := range jobsToRetry { + reason := "timeout" + q.processingJobsMtx.Lock() + pj := q.processingJobs[jobID] + if pj.lastAttemptFailed { + reason = "failed" + } + + // reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing. + q.processingJobs[jobID].dequeued = time.Now() + q.processingJobs[jobID].lastAttemptFailed = false + q.processingJobs[jobID].attemptsLeft-- + q.processingJobsMtx.Unlock() + + // Requeue the job + select { + case <-q.stop: + return + case q.queue <- pj.job: + q.metrics.jobRetries.WithLabelValues(reason).Inc() + level.Warn(util_log.Logger).Log( + "msg", "requeued job", + "job_id", jobID, + "job_type", pj.job.Type, + "reason", reason, + ) + } + } } } } @@ -212,6 +222,7 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error { now := time.Now() if err := s.Send(job); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to send job", "job_id", job.Id, "err", err) return err } q.metrics.jobsSent.Inc() @@ -221,6 +232,7 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error { resp, err := s.Recv() if err != nil { q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds()) + level.Error(util_log.Logger).Log("msg", "error receiving job response", "job_id", job.Id, "err", err) return err } q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds()) diff --git a/pkg/compactor/jobqueue/queue_test.go b/pkg/compactor/jobqueue/queue_test.go index 268c4a707d..452549b5e7 100644 --- a/pkg/compactor/jobqueue/queue_test.go +++ b/pkg/compactor/jobqueue/queue_test.go @@ -282,9 +282,9 @@ func TestQueue_JobTimeout(t *testing.T) { // Verify job is removed from processing jobs q.processingJobsMtx.RLock() pj, exists := q.processingJobs[job.Id] - q.processingJobsMtx.RUnlock() require.True(t, exists) require.Equal(t, 1, pj.attemptsLeft) + q.processingJobsMtx.RUnlock() } func TestQueue_Close(t *testing.T) {