fix: fix timeout and series progress marker for same requests with different shards (#17125)

pull/17200/head^2
Sandeep Sukhani 1 month ago committed by GitHub
parent cc50361a87
commit 288ec8c64d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 22
      pkg/compactor/deletion/delete_requests_manager.go
  2. 44
      pkg/compactor/deletion/delete_requests_manager_test.go

@ -343,7 +343,7 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels,
}
// The delete request touches the series. Do not skip if the series is not processed yet.
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; !ok {
return false
}
}
@ -366,7 +366,7 @@ func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls
var filterFuncs []filter.Func
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok {
continue
}
isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk)
@ -478,9 +478,13 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
// When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed.
// Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams.
if len(d.deleteRequestsToProcess) > 0 {
d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
}
}
}
@ -513,9 +517,9 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l
if !labels.Selector(req.matchers).Matches(lbls) {
continue
}
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName)
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, req.StartTime, req.EndTime, seriesID, tableName)
if _, ok := d.processedSeries[processedSeriesKey]; ok {
return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID)
return fmt.Errorf("series already marked as processed: [table: %s, user: %s, req_id: %s, start: %d, end: %d, series: %s]", tableName, userID, req.RequestID, req.StartTime, req.EndTime, seriesID)
}
d.processedSeries[processedSeriesKey] = struct{}{}
}
@ -523,8 +527,8 @@ func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, l
return nil
}
func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string {
return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID)
func buildProcessedSeriesKey(requestID string, startTime, endTime model.Time, seriesID []byte, tableName string) string {
return fmt.Sprintf("%s/%d/%d/%s/%s", requestID, startTime, endTime, tableName, seriesID)
}
func getMaxRetentionInterval(userID string, limits Limits) time.Duration {

@ -1019,10 +1019,6 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
user2 := []byte("user2")
lblFooBar := mustParseLabel(`{foo="bar"}`)
lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`)
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
}}
type markSeriesProcessed struct {
userID, seriesID []byte
lbls labels.Labels
@ -1189,6 +1185,11 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
workingDir := t.TempDir()
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
}}
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
@ -1207,6 +1208,7 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
// when the mark phase ends, series progress should get cleared
mgr.MarkPhaseFinished()
@ -1216,6 +1218,40 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
}
}
func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
workingDir := t.TempDir()
user1 := []byte("user1")
lblFooBar := mustParseLabel(`{foo="bar"}`)
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 100, EndTime: 200, Status: StatusReceived},
}}
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1"))
// timeout the retention processing
mgr.MarkPhaseTimedOut()
// timeout should not clear the series progress
mgr.MarkPhaseFinished()
require.Len(t, mgr.processedSeries, 2)
require.NoError(t, mgr.storeSeriesProgress())
require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename))
// load the requests again for processing
require.NoError(t, mgr.loadDeleteRequestsToProcess())
// not hitting the timeout should clear the series progress
mgr.MarkPhaseFinished()
require.Len(t, mgr.processedSeries, 0)
require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename))
}
type storeAddReqDetails struct {
userID, query string
startTime, endTime model.Time

Loading…
Cancel
Save