feat(compactor HS): Fixes and changes for getting horizontally scalable compactor to work well (#18526)

pull/17937/head^2
Sandeep Sukhani 6 months ago committed by GitHub
parent 2ba59ee431
commit 000f90d74b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/compactor/compactor.go
  2. 7
      pkg/compactor/deletion/delete_request.go
  3. 11
      pkg/compactor/deletion/delete_request_batch.go
  4. 60
      pkg/compactor/deletion/delete_request_test.go
  5. 8
      pkg/compactor/deletion/delete_requests_manager.go
  6. 10
      pkg/compactor/deletion/delete_requests_manager_test.go
  7. 61
      pkg/compactor/deletion/deletion_manifest_builder.go
  8. 8
      pkg/compactor/deletion/deletion_manifest_builder_test.go
  9. 54
      pkg/compactor/deletion/job_builder.go
  10. 12
      pkg/compactor/deletion/job_builder_test.go
  11. 4
      pkg/compactor/deletion/job_runner.go
  12. 4
      pkg/compactor/deletion/job_runner_test.go
  13. 40
      pkg/compactor/deletion/metrics.go
  14. 2
      pkg/compactor/jobqueue/metrics.go
  15. 56
      pkg/compactor/jobqueue/queue.go
  16. 2
      pkg/compactor/jobqueue/queue_test.go

@ -295,7 +295,7 @@ func (c *Compactor) init(
c.tablesManager = newTablesManager(c.cfg, c.storeContainers, c.indexCompactors, c.schemaConfig, c.expirationChecker, c.metrics)
if c.cfg.RetentionEnabled {
if err := c.deleteRequestsManager.Init(c.tablesManager); err != nil {
if err := c.deleteRequestsManager.Init(c.tablesManager, r); err != nil {
return err
}
}

@ -6,6 +6,7 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -33,8 +34,8 @@ type DeleteRequest struct {
logSelectorExpr syntax.LogSelectorExpr `json:"-"`
timeInterval *timeInterval `json:"-"`
Metrics *deleteRequestsManagerMetrics `json:"-"`
DeletedLines int32 `json:"-"`
TotalLinesDeletedMetric *prometheus.CounterVec `json:"-"`
DeletedLines int32 `json:"-"`
}
func (d *DeleteRequest) SetQuery(logQL string) error {
@ -89,7 +90,7 @@ func (d *DeleteRequest) FilterFunction(lbls labels.Labels) (filter.Func, error)
result, _, skip := f(0, s, structuredMetadata)
if len(result) != 0 || skip {
d.Metrics.deletedLinesTotal.WithLabelValues(d.UserID).Inc()
d.TotalLinesDeletedMetric.WithLabelValues(d.UserID).Inc()
d.DeletedLines++
return true
}

@ -48,7 +48,7 @@ func (b *deleteRequestBatch) userIDs() []string {
// addDeleteRequest add a requests to the batch
func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) {
dr.Metrics = b.metrics
dr.TotalLinesDeletedMetric = b.metrics.deletedLinesTotal
ur, ok := b.deleteRequestsToProcess[dr.UserID]
if !ok {
ur = &userDeleteRequests{
@ -173,3 +173,12 @@ func (b *deleteRequestBatch) getAllRequests() []*DeleteRequest {
return requests
}
func (b *deleteRequestBatch) getDeletionIntervalForUser(userID string) model.Interval {
userRequests, ok := b.deleteRequestsToProcess[userID]
if !ok {
return model.Interval{}
}
return userRequests.requestsInterval
}

@ -270,7 +270,7 @@ func TestDeleteRequest_GetChunkFilter(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil)
tc.deleteRequest.TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deletedLinesTotal
isExpired, filterFunc := tc.deleteRequest.GetChunkFilter([]byte(user1), mustParseLabel(lbl), chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isExpired)
if tc.expectedResp.expectedFilter == nil {
@ -310,11 +310,11 @@ func mustParseLabel(input string) labels.Labels {
func TestDeleteRequest_FilterFunction(t *testing.T) {
t.Run("one line matching with line filter", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} |= "some"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
Query: `{foo="bar"} |= "some"`,
DeletedLines: 0,
TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal,
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := lblFooBar
@ -328,16 +328,16 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.False(t, f(time.Now(), "", labels.EmptyLabels()))
require.False(t, f(time.Now(), "other line", labels.EmptyLabels()))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric))
})
t.Run("one line matching with structured metadata filter", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} | ping="pong"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
Query: `{foo="bar"} | ping="pong"`,
DeletedLines: 0,
TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal,
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := lblFooBar
@ -351,16 +351,16 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.False(t, f(time.Now(), "", labels.EmptyLabels()))
require.False(t, f(time.Now(), "some line", labels.EmptyLabels()))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric))
})
t.Run("one line matching with line and structured metadata filter", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} | ping="pong" |= "some"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
Query: `{foo="bar"} | ping="pong" |= "some"`,
DeletedLines: 0,
TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal,
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := lblFooBar
@ -375,15 +375,15 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.False(t, f(time.Now(), "some line", labels.EmptyLabels()))
require.False(t, f(time.Now(), "other line", labels.FromStrings(lblPing, lblPong)))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
require.Equal(t, float64(1), testutil.ToFloat64(dr.TotalLinesDeletedMetric))
})
t.Run("labels not matching", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} |= "some"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
UserID: "tenant1",
Query: `{foo="bar"} |= "some"`,
DeletedLines: 0,
TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal,
UserID: "tenant1",
}
lblStr := `{foo2="buzz"}`
@ -398,17 +398,17 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.False(t, f(time.Time{}, "some line", labels.EmptyLabels()))
require.Equal(t, int32(0), dr.DeletedLines)
// testutil.ToFloat64 panics when there are 0 metrics
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
require.Panics(t, func() { testutil.ToFloat64(dr.TotalLinesDeletedMetric) })
})
t.Run("no line filter", func(t *testing.T) {
now := model.Now()
dr := DeleteRequest{
Query: `{namespace="default"}`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: now.Add(-time.Hour),
EndTime: now,
Query: `{namespace="default"}`,
DeletedLines: 0,
TotalLinesDeletedMetric: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()).deletedLinesTotal,
StartTime: now.Add(-time.Hour),
EndTime: now,
}
lblStr := `{namespace="default"}`
@ -425,7 +425,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.Equal(t, int32(0), dr.DeletedLines)
// testutil.ToFloat64 panics when there are 0 metrics
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
require.Panics(t, func() { testutil.ToFloat64(dr.TotalLinesDeletedMetric) })
})
}

@ -101,7 +101,7 @@ func NewDeleteRequestsManager(
return dm, nil
}
func (d *DeleteRequestsManager) Init(tablesManager TablesManager) error {
func (d *DeleteRequestsManager) Init(tablesManager TablesManager, registerer prometheus.Registerer) error {
d.tablesManager = tablesManager
if d.HSModeEnabled {
@ -109,7 +109,7 @@ func (d *DeleteRequestsManager) Init(tablesManager TablesManager) error {
for _, req := range requests {
d.markRequestAsProcessed(req)
}
})
}, registerer)
}
var err error
@ -231,6 +231,10 @@ func (d *DeleteRequestsManager) buildDeletionManifest(ctx context.Context) error
return err
}
if iterator == nil {
continue
}
if err := iterator.ForEachSeries(ctx, func(series retention.Series) (err error) {
return deletionManifestBuilder.AddSeries(ctx, tableName, series)
}); err != nil {

@ -1018,7 +1018,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deletionMode: tc.deletionMode.String(),
}}, false, nil, nil)
require.NoError(t, err)
require.NoError(t, mgr.Init(nil))
require.NoError(t, mgr.Init(nil, nil))
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
@ -1105,7 +1105,7 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
for _, tc := range tt {
mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil)
require.NoError(t, err)
require.NoError(t, mgr.Init(nil))
require.NoError(t, mgr.Init(nil, nil))
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
@ -1292,7 +1292,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil)
require.NoError(t, err)
require.NoError(t, mgr.Init(nil))
require.NoError(t, mgr.Init(nil, nil))
wg := sync.WaitGroup{}
mgrCtx, mgrCtxCancel := context.WithCancel(context.Background())
@ -1320,7 +1320,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil)
require.NoError(t, err)
require.NoError(t, mgr.Init(nil))
require.NoError(t, mgr.Init(nil, nil))
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
@ -1345,7 +1345,7 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, false, nil, nil)
require.NoError(t, err)
require.NoError(t, mgr.Init(nil))
require.NoError(t, mgr.Init(nil, nil))
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)

@ -12,6 +12,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
@ -67,6 +69,7 @@ type deletionManifestBuilder struct {
currentTableName string
allUserRequests []*DeleteRequest
deletionInterval model.Interval
creationTime time.Time
segmentsCount int
overallChunksCount int
@ -98,10 +101,35 @@ func newDeletionManifestBuilder(deletionManifestStoreClient client.ObjectClient,
return builder, nil
}
func (d *deletionManifestBuilder) canSkipSeries(userID []byte, lbls labels.Labels) (bool, error) {
userIDStr := unsafeGetString(userID)
userRequests := d.deleteRequestBatch.getAllRequestsForUser(userIDStr)
if len(userRequests) == 0 {
return true, fmt.Errorf("no requests loaded for user: %s", userIDStr)
}
for _, deleteRequest := range d.deleteRequestBatch.getAllRequestsForUser(userIDStr) {
// if the delete request touches the series, do not skip it
if labels.Selector(deleteRequest.matchers).Matches(lbls) {
return false, nil
}
}
return true, nil
}
// AddSeries adds a series and its chunks to the current segment.
// It flushes the current segment if the user ID or table name changes.
// It also ensures that the current segment does not exceed the maximum number of chunks.
func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName string, series retention.Series) error {
canSkip, err := d.canSkipSeries(series.UserID(), series.Labels())
if err != nil {
return err
}
if canSkip {
return nil
}
userIDStr := unsafeGetString(series.UserID())
currentLabels := series.Labels().String()
@ -115,13 +143,17 @@ func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName strin
d.currentUserID = string(series.UserID())
d.currentTableName = tableName
d.allUserRequests = d.deleteRequestBatch.getAllRequestsForUser(userIDStr)
if len(d.allUserRequests) == 0 {
return fmt.Errorf("no requests loaded for user: %s", userIDStr)
}
d.deletionInterval = d.deleteRequestBatch.getDeletionIntervalForUser(userIDStr)
}
var chunksGroupIdentifier uint64
for _, chk := range series.Chunks() {
if !intervalsOverlap(d.deletionInterval, model.Interval{
Start: chk.From,
End: chk.Through,
}) {
continue
}
if d.currentSegmentChunksCount >= maxChunksPerSegment {
if err := d.flushCurrentBatch(ctx); err != nil {
return err
@ -185,10 +217,6 @@ func (d *deletionManifestBuilder) Finish(ctx context.Context) error {
return err
}
if d.overallChunksCount == 0 {
return ErrNoChunksSelectedForDeletion
}
level.Debug(d.logger).Log("msg", "uploading manifest file after finishing building deletion manifest",
"total_segments", d.segmentsCount,
"total_chunks", d.overallChunksCount,
@ -216,6 +244,9 @@ func (d *deletionManifestBuilder) Finish(ctx context.Context) error {
}
func (d *deletionManifestBuilder) flushCurrentBatch(ctx context.Context) error {
if d.currentSegmentChunksCount == 0 {
return nil
}
level.Debug(d.logger).Log("msg", "flushing segment",
"segment_num", d.segmentsCount-1,
"chunks_count", d.currentSegmentChunksCount,
@ -314,7 +345,7 @@ func cleanupInvalidManifests(ctx context.Context, deletionManifestStoreClient cl
// manifest without manifest.json is considered invalid
manifestPath := path.Join(string(commonPrefix), manifestFileName)
exists, err := deletionManifestStoreClient.ObjectExists(ctx, manifestPath)
exists, err := objectExists(ctx, deletionManifestStoreClient, manifestPath)
if err != nil {
return err
}
@ -346,3 +377,17 @@ func cleanupInvalidManifests(ctx context.Context, deletionManifestStoreClient cl
return firstErr
}
// objectExists checks if an object exists in storage with the given key.
// We can't use ObjectClient.ObjectExists method due to a bug in the GCS object client implementation of Thanos.
// (Sandeep): I will fix the bug upstream and remove this once we have the fix merged.
func objectExists(ctx context.Context, objectClient client.ObjectClient, objectPath string) (bool, error) {
_, err := objectClient.GetAttributes(ctx, objectPath)
if err == nil {
return true, nil
} else if objectClient.IsObjectNotFoundErr(err) {
return false, nil
}
return false, err
}

@ -589,7 +589,7 @@ func TestDeletionManifestBuilder(t *testing.T) {
require.NoError(t, err)
// Create delete request batch
batch := newDeleteRequestBatch(nil)
batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
for _, req := range tc.deleteRequests {
batch.addDeleteRequest(&req)
}
@ -662,7 +662,7 @@ func TestDeletionManifestBuilder_Errors(t *testing.T) {
require.NoError(t, err)
// Create delete request batch
batch := newDeleteRequestBatch(nil)
batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
batch.addDeleteRequest(&DeleteRequest{
UserID: user1,
RequestID: req1,
@ -683,7 +683,7 @@ func TestDeletionManifestBuilder_Errors(t *testing.T) {
require.EqualError(t, err, fmt.Sprintf("no requests loaded for user: %s", user2))
err = builder.Finish(ctx)
require.EqualError(t, err, ErrNoChunksSelectedForDeletion.Error())
require.NoError(t, err)
}
func TestCleanupInvalidManifest(t *testing.T) {
@ -695,7 +695,7 @@ func TestCleanupInvalidManifest(t *testing.T) {
require.NoError(t, err)
// Create delete request batch
batch := newDeleteRequestBatch(nil)
batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
batch.addDeleteRequest(&DeleteRequest{
UserID: user1,
RequestID: req1,

@ -12,6 +12,7 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
@ -21,6 +22,14 @@ import (
const (
maxChunksPerJob = 1000
storageUpdatesFilenameSuffix = `-storage-updates.json`
processManifestStageBuildJobs = "build_jobs"
processManifestStageApplyStorageUpdates = "apply_storage_updates"
processManifestStageCleanupManifest = "cleanup_manifest"
storageUpdateTypeDeleteChunk = "delete_chunk"
storageUpdateTypeDeIndexChunk = "de_index_chunk"
storageUpdateTypeIndexChunk = "index_chunk"
)
type StorageUpdatesIterator interface {
@ -61,9 +70,15 @@ type JobBuilder struct {
currentManifestMtx sync.RWMutex
currSegmentStorageUpdates *storageUpdatesCollection
metrics *jobBuilderMetrics
}
func NewJobBuilder(deletionManifestStoreClient client.ObjectClient, applyStorageUpdatesFunc ApplyStorageUpdatesFunc, markRequestsAsProcessedFunc markRequestsAsProcessedFunc) *JobBuilder {
func NewJobBuilder(
deletionManifestStoreClient client.ObjectClient,
applyStorageUpdatesFunc ApplyStorageUpdatesFunc,
markRequestsAsProcessedFunc markRequestsAsProcessedFunc,
r prometheus.Registerer,
) *JobBuilder {
return &JobBuilder{
deletionManifestStoreClient: deletionManifestStoreClient,
applyStorageUpdatesFunc: applyStorageUpdatesFunc,
@ -71,6 +86,7 @@ func NewJobBuilder(deletionManifestStoreClient client.ObjectClient, applyStorage
currSegmentStorageUpdates: &storageUpdatesCollection{
StorageUpdates: map[string]*storageUpdates{},
},
metrics: newJobBuilderMetrics(r),
}
}
@ -81,7 +97,6 @@ func (b *JobBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *grpc.Job) {
for {
if err := b.buildJobs(ctx, jobsChan); err != nil {
// ToDo(Sandeep): Add a metric for tracking failures in building jobs
level.Error(util_log.Logger).Log("msg", "error building jobs", "err", err)
}
@ -102,24 +117,35 @@ func (b *JobBuilder) buildJobs(ctx context.Context, jobsChan chan<- *grpc.Job) e
return err
}
if len(manifests) == 0 {
return nil
}
b.metrics.numManifestsLeftToProcess.Set(float64(len(manifests)))
// Process each manifest
for _, manifestPath := range manifests {
manifest, err := b.readManifest(ctx, manifestPath)
if err != nil {
b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageBuildJobs).Inc()
return err
}
if err := b.processManifest(ctx, manifest, manifestPath, jobsChan); err != nil {
b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageBuildJobs).Inc()
return err
}
if err := b.applyStorageUpdates(ctx, manifest, manifestPath); err != nil {
b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageApplyStorageUpdates).Inc()
return err
}
if err := b.cleanupManifest(ctx, manifest, manifestPath); err != nil {
b.metrics.processManifestFailuresTotal.WithLabelValues(processManifestStageCleanupManifest).Inc()
return err
}
b.metrics.numManifestsLeftToProcess.Dec()
}
return nil
@ -137,6 +163,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma
cancel: cancel,
}
b.currentManifestMtx.Unlock()
b.metrics.numSegmentsLeftToProcess.Set(float64(manifest.SegmentsCount))
// Process segments sequentially
for segmentNum := 0; ctx.Err() == nil && segmentNum < manifest.SegmentsCount; segmentNum++ {
@ -151,6 +178,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma
return err
}
if !manifestExists {
b.metrics.numSegmentsLeftToProcess.Dec()
level.Info(util_log.Logger).Log("msg", "manifest does not exist(likely processed already), skipping", "manifest", manifestPath)
continue
}
@ -199,6 +227,7 @@ func (b *JobBuilder) processManifest(ctx context.Context, manifest *manifest, ma
level.Info(util_log.Logger).Log("msg", "finished segment processing",
"manifest", manifestPath,
"segment", segmentNum)
b.metrics.numSegmentsLeftToProcess.Dec()
}
level.Info(util_log.Logger).Log("msg", "finished manifest processing", "manifest", manifestPath)
@ -251,7 +280,7 @@ func (b *JobBuilder) listManifests(ctx context.Context) ([]string, error) {
// Check if manifest.json exists in this directory
manifestPath := path.Join(string(commonPrefix), manifestFileName)
exists, err := b.deletionManifestStoreClient.ObjectExists(ctx, manifestPath)
exists, err := objectExists(context.Background(), b.deletionManifestStoreClient, manifestPath)
if err != nil {
return nil, err
}
@ -362,7 +391,7 @@ func (b *JobBuilder) OnJobResponse(response *grpc.JobResult) error {
// applyStorageUpdates applies all the storage updates accumulated while processing of the given manifest
func (b *JobBuilder) applyStorageUpdates(ctx context.Context, manifest *manifest, manifestPath string) error {
storageUpdatesIterator := newStorageUpdatesIterator(ctx, manifestPath, manifest, b.deletionManifestStoreClient)
storageUpdatesIterator := newStorageUpdatesIterator(ctx, manifestPath, manifest, b.deletionManifestStoreClient, b.metrics.storageUpdatesAppliedTotal)
return b.applyStorageUpdatesFunc(ctx, storageUpdatesIterator)
}
@ -410,9 +439,9 @@ func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segme
}
type storageUpdates struct {
ChunksToDelete []string // List of chunks to be deleted from object storage and removed from the index of the current table
ChunksToDeIndex []string // List of chunks only to be removed from the index of the current table
ChunksToIndex []chunk // List of chunks to be indexed in the current table
ChunksToDelete []string `json:"chunks_to_delete,omitempty"` // List of chunks to be deleted from object storage and removed from the index of the current table
ChunksToDeIndex []string `json:"chunks_to_de_index,omitempty"` // List of chunks only to be removed from the index of the current table
ChunksToIndex []chunk `json:"chunks_to_index,omitempty"` // List of chunks to be indexed in the current table
}
// storageUpdatesCollection collects updates to be made to the storage for a single segment
@ -433,6 +462,10 @@ func (i *storageUpdatesCollection) reset(tableName, userID string) {
}
func (i *storageUpdatesCollection) addUpdates(labels string, result storageUpdates) {
if len(result.ChunksToIndex)+len(result.ChunksToDeIndex)+len(result.ChunksToDelete) == 0 {
return
}
i.mtx.Lock()
defer i.mtx.Unlock()
@ -460,19 +493,21 @@ type storageUpdatesIterator struct {
manifestPath string
manifest *manifest
deletionManifestStoreClient client.ObjectClient
storageUpdatesTotal *prometheus.CounterVec
currSegmentNum int
currUpdatesCollection *storageUpdatesCollection
err error
}
func newStorageUpdatesIterator(ctx context.Context, manifestPath string, manifest *manifest, deletionManifestStoreClient client.ObjectClient) *storageUpdatesIterator {
func newStorageUpdatesIterator(ctx context.Context, manifestPath string, manifest *manifest, deletionManifestStoreClient client.ObjectClient, storageUpdatesTotal *prometheus.CounterVec) *storageUpdatesIterator {
return &storageUpdatesIterator{
ctx: ctx,
manifestPath: manifestPath,
manifest: manifest,
deletionManifestStoreClient: deletionManifestStoreClient,
currSegmentNum: -1,
storageUpdatesTotal: storageUpdatesTotal,
}
}
@ -542,6 +577,9 @@ func (i *storageUpdatesIterator) ForEachSeries(callback func(labels string, chun
if err := callback(labels, updates.ChunksToDelete, updates.ChunksToDeIndex, chunksToIndex); err != nil {
return err
}
i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeDeleteChunk).Add(float64(len(updates.ChunksToDelete)))
i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeDeIndexChunk).Add(float64(len(updates.ChunksToDeIndex)))
i.storageUpdatesTotal.WithLabelValues(storageUpdateTypeIndexChunk).Add(float64(len(updates.ChunksToIndex)))
}
return nil

@ -67,7 +67,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
{
name: "one manifest in storage with less than maxChunksPerJob",
setupManifest: func(client client.ObjectClient) []DeleteRequest {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
requestsToAdd := []DeleteRequest{
{
RequestID: req1,
@ -125,7 +125,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
{
name: "one manifest in storage with more than maxChunksPerJob",
setupManifest: func(client client.ObjectClient) []DeleteRequest {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
requestsToAdd := []DeleteRequest{
{
RequestID: req1,
@ -199,7 +199,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
{
name: "one manifest in storage with multiple groups",
setupManifest: func(client client.ObjectClient) []DeleteRequest {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
requestsToAdd := []DeleteRequest{
{
UserID: user1,
@ -287,7 +287,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
{
name: "one manifest in storage with multiple segments due to multiple tables",
setupManifest: func(client client.ObjectClient) []DeleteRequest {
deleteRequestBatch := newDeleteRequestBatch(nil)
deleteRequestBatch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
requestsToAdd := []DeleteRequest{
{
RequestID: req1,
@ -392,7 +392,7 @@ func TestJobBuilder_buildJobs(t *testing.T) {
return iterator.Err()
}, func(requests []DeleteRequest) {
requestsMarkedAsProcessed = requests
})
}, nil)
jobsChan := make(chan *grpc.Job)
var jobsBuilt []grpc.Job
@ -470,7 +470,7 @@ func TestJobBuilder_ProcessManifest(t *testing.T) {
builder := NewJobBuilder(objectClient, func(_ context.Context, _ StorageUpdatesIterator) error {
return nil
}, func(_ []DeleteRequest) {})
}, func(_ []DeleteRequest) {}, nil)
// Create a test manifest
manifest := &manifest{

@ -102,8 +102,7 @@ func (jr *JobRunner) Run(ctx context.Context, job *grpc.Job) ([]byte, error) {
if !req.logSelectorExpr.HasFilter() {
return nil, errors.New("deletion query does not contain filter")
}
// ToDo(Sandeep): set it to proper metrics
req.Metrics = newDeleteRequestsManagerMetrics(nil)
req.TotalLinesDeletedMetric = jr.metrics.deletedLinesTotal
}
tableInterval := retention.ExtractIntervalFromTableName(deletionJob.TableName)
@ -218,6 +217,7 @@ func (jr *JobRunner) Run(ctx context.Context, job *grpc.Job) ([]byte, error) {
return nil, err
}
jr.metrics.chunksProcessedTotal.Add(float64(len(deletionJob.ChunkIDs)))
jobResultJSON, err := json.Marshal(updates)
if err != nil {
return nil, err

@ -272,7 +272,7 @@ func TestJobRunner_Run(t *testing.T) {
// Ensure Metrics is set for each DeleteRequest
for i := range tc.deleteRequests {
tc.deleteRequests[i].Metrics = newDeleteRequestsManagerMetrics(nil)
tc.deleteRequests[i].TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deletedLinesTotal
}
// Create job runner
@ -421,7 +421,7 @@ func TestJobRunner_Run_ConcurrentChunkProcessing(t *testing.T) {
// Ensure Metrics is set for each DeleteRequest
for i := range deleteRequests {
deleteRequests[i].Metrics = newDeleteRequestsManagerMetrics(nil)
deleteRequests[i].TotalLinesDeletedMetric = newDeleteRequestsManagerMetrics(nil).deleteRequestsProcessedTotal
}
// Create job runner with chunk processing concurrency of 2

@ -119,6 +119,7 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan
type deletionJobRunnerMetrics struct {
chunksProcessedTotal prometheus.Counter
deletedLinesTotal *prometheus.CounterVec
}
func newDeletionJobRunnerMetrics(r prometheus.Registerer) *deletionJobRunnerMetrics {
@ -129,6 +130,45 @@ func newDeletionJobRunnerMetrics(r prometheus.Registerer) *deletionJobRunnerMetr
Name: "compactor_deletion_job_runner_chunks_processed_total",
Help: "Number of chunks processed",
})
m.deletedLinesTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "compactor_deletion_job_runner_deleted_lines_total",
Help: "Number of deleted lines per user",
}, []string{"user"})
return &m
}
type jobBuilderMetrics struct {
numSegmentsLeftToProcess prometheus.Gauge
numManifestsLeftToProcess prometheus.Gauge
processManifestFailuresTotal *prometheus.CounterVec
storageUpdatesAppliedTotal *prometheus.CounterVec
}
func newJobBuilderMetrics(r prometheus.Registerer) *jobBuilderMetrics {
m := jobBuilderMetrics{}
m.numSegmentsLeftToProcess = promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: constants.Loki,
Name: "compactor_job_builder_num_segments_left_to_process",
Help: "Number of segments left to process to finish processing the current segment",
})
m.numManifestsLeftToProcess = promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: constants.Loki,
Name: "compactor_job_builder_num_manifests_left_to_process",
Help: "Number of manifests left to process",
})
m.processManifestFailuresTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "compactor_process_manifest_failures_total",
Help: "Number of times compactor failed to process manifest at various stages",
}, []string{"stage"})
m.storageUpdatesAppliedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "compactor_deletion_storage_updates_applied_total",
Help: "Number of storage updates made by type after processing of delete manifests",
}, []string{"type"})
return &m
}

@ -58,7 +58,7 @@ func newWorkerMetrics(r prometheus.Registerer, allWorkersConnectedToCompactor fu
m.jobsProcessed = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "compactor_worker_job_processed_attempts",
Name: "compactor_worker_jobs_processed_total",
Help: "Number of jobs processed by worker with their processing status",
}, []string{"status"})
m.workerConnectedToCompactor = promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{

@ -141,6 +141,8 @@ func (q *Queue) retryFailedJobs() {
case <-q.stop:
return
case <-ticker.C:
var jobsToRetry []string
q.processingJobsMtx.Lock()
now := time.Now()
for jobID, pj := range q.processingJobs {
@ -153,31 +155,39 @@ func (q *Queue) retryFailedJobs() {
}
timeout := q.builders[pj.job.Type].jobTimeout
if pj.lastAttemptFailed || now.Sub(pj.dequeued) > timeout {
// Requeue the job
select {
case <-q.stop:
return
case q.queue <- pj.job:
reason := "timeout"
if pj.lastAttemptFailed {
reason = "failed"
}
q.metrics.jobRetries.WithLabelValues(reason).Inc()
level.Warn(util_log.Logger).Log(
"msg", "requeued job",
"job_id", jobID,
"job_type", pj.job.Type,
"timeout", timeout,
"reason", reason,
)
// reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing.
q.processingJobs[jobID].dequeued = time.Now()
q.processingJobs[jobID].lastAttemptFailed = false
q.processingJobs[jobID].attemptsLeft--
}
jobsToRetry = append(jobsToRetry, jobID)
}
}
q.processingJobsMtx.Unlock()
for _, jobID := range jobsToRetry {
reason := "timeout"
q.processingJobsMtx.Lock()
pj := q.processingJobs[jobID]
if pj.lastAttemptFailed {
reason = "failed"
}
// reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing.
q.processingJobs[jobID].dequeued = time.Now()
q.processingJobs[jobID].lastAttemptFailed = false
q.processingJobs[jobID].attemptsLeft--
q.processingJobsMtx.Unlock()
// Requeue the job
select {
case <-q.stop:
return
case q.queue <- pj.job:
q.metrics.jobRetries.WithLabelValues(reason).Inc()
level.Warn(util_log.Logger).Log(
"msg", "requeued job",
"job_id", jobID,
"job_type", pj.job.Type,
"reason", reason,
)
}
}
}
}
}
@ -212,6 +222,7 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
now := time.Now()
if err := s.Send(job); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to send job", "job_id", job.Id, "err", err)
return err
}
q.metrics.jobsSent.Inc()
@ -221,6 +232,7 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
resp, err := s.Recv()
if err != nil {
q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds())
level.Error(util_log.Logger).Log("msg", "error receiving job response", "job_id", job.Id, "err", err)
return err
}
q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds())

@ -282,9 +282,9 @@ func TestQueue_JobTimeout(t *testing.T) {
// Verify job is removed from processing jobs
q.processingJobsMtx.RLock()
pj, exists := q.processingJobs[job.Id]
q.processingJobsMtx.RUnlock()
require.True(t, exists)
require.Equal(t, 1, pj.attemptsLeft)
q.processingJobsMtx.RUnlock()
}
func TestQueue_Close(t *testing.T) {

Loading…
Cancel
Save