feat: store details of processed streams while processing delete requests (#16825)

pull/16827/head^2
Sandeep Sukhani 1 year ago committed by GitHub
parent 8948c1147a
commit e3e1f096ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 33
      pkg/compactor/compactor.go
  2. 14
      pkg/compactor/deletion/delete_request.go
  3. 12
      pkg/compactor/deletion/delete_request_test.go
  4. 144
      pkg/compactor/deletion/delete_requests_manager.go
  5. 221
      pkg/compactor/deletion/delete_requests_manager_test.go
  6. 36
      pkg/compactor/retention/expiration.go
  7. 104
      pkg/compactor/retention/expiration_test.go
  8. 180
      pkg/compactor/retention/retention.go
  9. 131
      pkg/compactor/retention/retention_test.go
  10. 16
      pkg/compactor/retention/series.go
  11. 95
      pkg/compactor/retention/util_test.go
  12. 7
      pkg/compactor/testutil.go
  13. 13
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go
  14. 29
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go
  15. 24
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/index.go
  16. 67
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go
  17. 81
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go
  18. 6
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/series.go
  19. 57
      pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go
  20. 164
      pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go
  21. 2
      pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk.go
  22. 18
      pkg/tool/audit/audit.go
  23. 25
      pkg/tool/audit/audit_test.go
  24. 22
      tools/tsdb/tsdb-map/main.go

@ -13,14 +13,14 @@ import (
"unsafe"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
@ -406,13 +406,10 @@ func (c *Compactor) initDeletes(objectClient client.ObjectClient, indexUpdatePro
c.DeleteRequestsGRPCHandler = deletion.NewGRPCRequestHandler(c.deleteRequestsStore, limits)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
c.cfg.DeleteBatchSize,
limits,
r,
)
c.deleteRequestsManager, err = deletion.NewDeleteRequestsManager(deletionWorkDir, c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, c.cfg.DeleteBatchSize, limits, r)
if err != nil {
return err
}
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
return nil
@ -853,12 +850,12 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio
return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
}
func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, filter.Func) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
func (e *expirationChecker) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, now model.Time) (bool, filter.Func) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(userID, chk, lbls, seriesID, tableName, now); expired {
return expired, nonDeletedIntervals
}
return e.deletionExpiryChecker.Expired(ref, now)
return e.deletionExpiryChecker.Expired(userID, chk, lbls, seriesID, tableName, now)
}
func (e *expirationChecker) MarkPhaseStarted() {
@ -885,8 +882,12 @@ func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval
return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID)
}
func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now)
func (e *expirationChecker) DropFromIndex(userID []byte, chk retention.Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool {
return e.retentionExpiryChecker.DropFromIndex(userID, chk, labels, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(userID, chk, labels, tableEndTime, now)
}
func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool {
return e.retentionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) && e.deletionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now)
}
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {

@ -109,14 +109,14 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool {
// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest.
// It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters.
// If the filter.Func is nil, the whole chunk is supposed to be deleted.
func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func) {
if d.UserID != unsafeGetString(entry.UserID) {
func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
if d.UserID != unsafeGetString(userID) {
return false, nil
}
if !intervalsOverlap(model.Interval{
Start: entry.From,
End: entry.Through,
Start: chunk.From,
End: chunk.Through,
}, model.Interval{
Start: d.StartTime,
End: d.EndTime,
@ -137,16 +137,16 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func
}
}
if !labels.Selector(d.matchers).Matches(entry.Labels) {
if !labels.Selector(d.matchers).Matches(lbls) {
return false, nil
}
if d.StartTime <= entry.From && d.EndTime >= entry.Through && !d.logSelectorExpr.HasFilter() {
if d.StartTime <= chunk.From && d.EndTime >= chunk.Through && !d.logSelectorExpr.HasFilter() {
// Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted
return true, nil
}
ff, err := d.FilterFunction(entry.Labels)
ff, err := d.FilterFunction(lbls)
if err != nil {
// The query in the delete request is checked when added to the table.
// So this error should not occur.

@ -33,13 +33,9 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
lblWithStructuredMetadataFilter := `{foo="bar", fizz="buzz"} | ping="pong"`
lblWithLineAndStructuredMetadataFilter := `{foo="bar", fizz="buzz"} | ping="pong" |= "filter"`
chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(user1),
From: now.Add(-3 * time.Hour),
Through: now.Add(-time.Hour),
},
Labels: mustParseLabel(lbl),
chunkEntry := retention.Chunk{
From: now.Add(-3 * time.Hour),
Through: now.Add(-time.Hour),
}
type resp struct {
@ -275,7 +271,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil)
isExpired, filterFunc := tc.deleteRequest.IsDeleted(chunkEntry)
isExpired, filterFunc := tc.deleteRequest.IsDeleted([]byte(user1), mustParseLabel(lbl), chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isExpired)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)

@ -2,12 +2,16 @@ package deletion
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -21,6 +25,8 @@ import (
const (
statusSuccess = "success"
statusFail = "fail"
seriesProgressFilename = "series_progress.json"
)
type userDeleteRequests struct {
@ -30,6 +36,7 @@ type userDeleteRequests struct {
}
type DeleteRequestsManager struct {
workingDir string
deleteRequestsStore DeleteRequestsStore
deleteRequestCancelPeriod time.Duration
@ -41,10 +48,12 @@ type DeleteRequestsManager struct {
done chan struct{}
batchSize int
limits Limits
processedSeries map[string]struct{}
}
func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) *DeleteRequestsManager {
func NewDeleteRequestsManager(workingDir string, store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) (*DeleteRequestsManager, error) {
dm := &DeleteRequestsManager{
workingDir: workingDir,
deleteRequestsStore: store,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
deleteRequestsToProcess: map[string]*userDeleteRequests{},
@ -52,6 +61,13 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri
done: make(chan struct{}),
batchSize: batchSize,
limits: limits,
processedSeries: map[string]struct{}{},
}
var err error
dm.processedSeries, err = loadSeriesProgress(workingDir)
if err != nil {
return nil, err
}
go dm.loop()
@ -60,7 +76,7 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
return dm
return dm, nil
}
func (d *DeleteRequestsManager) loop() {
@ -76,6 +92,10 @@ func (d *DeleteRequestsManager) loop() {
if err := d.updateMetrics(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to update metrics", "err", err)
}
if err := d.storeSeriesProgress(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to store series progress", "err", err)
}
case <-d.done:
return
}
@ -85,6 +105,27 @@ func (d *DeleteRequestsManager) loop() {
func (d *DeleteRequestsManager) Stop() {
close(d.done)
d.wg.Wait()
if err := d.storeSeriesProgress(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to store series progress", "err", err)
}
}
func (d *DeleteRequestsManager) storeSeriesProgress() error {
if len(d.processedSeries) == 0 {
return nil
}
data, err := json.Marshal(d.processedSeries)
if err != nil {
return errors.Wrap(err, "failed to json encode series progress")
}
err = os.WriteFile(filepath.Join(d.workingDir, seriesProgressFilename), data, 0640)
if err != nil {
return errors.Wrap(err, "failed to store series progress to the file")
}
return nil
}
func (d *DeleteRequestsManager) updateMetrics() error {
@ -180,6 +221,13 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
break
}
if deleteRequest.logSelectorExpr == nil {
err := deleteRequest.SetQuery(deleteRequest.Query)
if err != nil {
return errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID)
}
}
level.Info(util_log.Logger).Log(
"msg", "Started processing delete request for user",
"delete_request_id", deleteRequest.RequestID,
@ -278,14 +326,39 @@ func (d *DeleteRequestsManager) shouldProcessRequest(dr DeleteRequest) (bool, er
return mode == deletionmode.FilterAndDelete, nil
}
func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, filter.Func) {
func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, _ model.Time, tableName string, _ model.Time) bool {
userIDStr := unsafeGetString(userID)
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
if d.deleteRequestsToProcess[userIDStr] == nil {
return true
}
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
// if the delete request does not touch the series, continue looking for other matching requests
if !labels.Selector(deleteRequest.matchers).Matches(lbls) {
continue
}
// The delete request touches the series. Do not skip if the series is not processed yet.
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok {
return false
}
}
return true
}
func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, _ model.Time) (bool, filter.Func) {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
userIDStr := unsafeGetString(ref.UserID)
userIDStr := unsafeGetString(userID)
if d.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(d.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{
Start: ref.From,
End: ref.Through,
Start: chk.From,
End: chk.Through,
}) {
return false, nil
}
@ -293,7 +366,10 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
var filterFuncs []filter.Func
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
isDeleted, ff := deleteRequest.IsDeleted(ref)
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok {
continue
}
isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk)
if !isDeleted {
continue
}
@ -304,9 +380,9 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"chunkID", string(ref.ChunkID),
"chunkID", string(chk.ChunkID),
)
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc()
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, nil
}
filterFuncs = append(filterFuncs, ff)
@ -316,7 +392,7 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
return false, nil
}
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc()
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, func(ts time.Time, s string, structuredMetadata ...labels.Label) bool {
for _, ff := range filterFuncs {
if ff(ts, s, structuredMetadata...) {
@ -401,6 +477,11 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
if err := d.deleteRequestsStore.MergeShardedRequests(context.Background()); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
}
}
func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
@ -417,10 +498,35 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u
return len(d.deleteRequestsToProcess) != 0
}
func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool {
func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool {
return false
}
func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error {
userIDStr := unsafeGetString(userID)
if d.deleteRequestsToProcess[userIDStr] == nil {
return nil
}
for _, req := range d.deleteRequestsToProcess[userIDStr].requests {
// if the delete request does not touch the series, do not waste space in storing the marker
if !labels.Selector(req.matchers).Matches(lbls) {
continue
}
processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName)
if _, ok := d.processedSeries[processedSeriesKey]; ok {
return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID)
}
d.processedSeries[processedSeriesKey] = struct{}{}
}
return nil
}
func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string {
return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID)
}
func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
maxRetention := model.Duration(limits.RetentionPeriod(userID))
if maxRetention == 0 {
@ -438,3 +544,19 @@ func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
return time.Duration(maxRetention)
}
func loadSeriesProgress(workingDir string) (map[string]struct{}, error) {
data, err := os.ReadFile(filepath.Join(workingDir, seriesProgressFilename))
if err != nil && !os.IsNotExist(err) {
return nil, err
}
processedSeries := map[string]struct{}{}
if len(data) > 0 {
if err := json.Unmarshal(data, &processedSeries); err != nil {
return nil, err
}
}
return processedSeries, nil
}

@ -2,6 +2,7 @@ package deletion
import (
"context"
"path/filepath"
"strings"
"testing"
"time"
@ -31,13 +32,9 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
streamSelectorWithStructuredMetadataFilters := lblFoo.String() + `| ping="pong"`
streamSelectorWithLineAndStructuredMetadataFilters := lblFoo.String() + `| ping="pong" |= "fizz"`
chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(testUserID),
From: now.Add(-12 * time.Hour),
Through: now.Add(-time.Hour),
},
Labels: lblFoo,
chunkEntry := retention.Chunk{
From: now.Add(-12 * time.Hour),
Through: now.Add(-time.Hour),
}
for _, tc := range []struct {
@ -936,10 +933,11 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
mockDeleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}
mgr := NewDeleteRequestsManager(mockDeleteRequestsStore, time.Hour, tc.batchSize, &fakeLimits{defaultLimit: limit{
mgr, err := NewDeleteRequestsManager(t.TempDir(), mockDeleteRequestsStore, time.Hour, tc.batchSize, &fakeLimits{defaultLimit: limit{
retentionPeriod: 7 * 24 * time.Hour,
deletionMode: tc.deletionMode.String(),
}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
for _, deleteRequests := range mgr.deleteRequestsToProcess {
@ -948,7 +946,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}
}
isExpired, filterFunc := mgr.Expired(chunkEntry, model.Now())
isExpired, filterFunc := mgr.Expired([]byte(testUserID), chunkEntry, lblFoo, nil, "", model.Now())
require.Equal(t, tc.expectedResp.isExpired, isExpired)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)
@ -1007,7 +1005,8 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
}
for _, tc := range tt {
mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
interval := model.Interval{Start: 300, End: 600}
@ -1015,6 +1014,208 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
}
}
func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
user1 := []byte("user1")
user2 := []byte("user2")
lblFooBar := mustParseLabel(`{foo="bar"}`)
lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`)
deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{
{RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived},
{RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived},
}}
type markSeriesProcessed struct {
userID, seriesID []byte
lbls labels.Labels
tableName string
}
type chunkEntry struct {
userID []byte
chk retention.Chunk
lbls labels.Labels
seriesID []byte
tableName string
}
for _, tc := range []struct {
name string
seriesToMarkProcessed []markSeriesProcessed
chunkEntry chunkEntry
expSkipSeries bool
expExpired bool
}{
{
name: "no series marked as processed",
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: false,
expExpired: true,
},
{
name: "chunk's series marked as processed",
seriesToMarkProcessed: []markSeriesProcessed{
{
userID: user1,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t1",
},
},
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: true,
expExpired: false,
},
{
name: "a different series marked as processed",
seriesToMarkProcessed: []markSeriesProcessed{
{
userID: user1,
seriesID: []byte(lblFizzBuzz.String()),
lbls: lblFizzBuzz,
tableName: "t1",
},
},
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: false,
expExpired: true,
},
{
name: "a different users series marked as processed",
seriesToMarkProcessed: []markSeriesProcessed{
{
userID: user2,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t1",
},
},
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: false,
expExpired: true,
},
{
name: "series from different table marked as processed",
seriesToMarkProcessed: []markSeriesProcessed{
{
userID: user1,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t2",
},
},
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: false,
expExpired: true,
},
{
name: "multiple series marked as processed",
seriesToMarkProcessed: []markSeriesProcessed{
{
userID: user1,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t1",
},
{
userID: user1,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t2",
},
{
userID: user2,
seriesID: []byte(lblFooBar.String()),
lbls: lblFooBar,
tableName: "t1",
},
},
chunkEntry: chunkEntry{
userID: user1,
chk: retention.Chunk{
From: 10,
Through: 20,
},
lbls: lblFooBar,
seriesID: []byte(lblFooBar.String()),
tableName: "t1",
},
expSkipSeries: true,
expExpired: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
workingDir := t.TempDir()
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
for _, m := range tc.seriesToMarkProcessed {
require.NoError(t, mgr.MarkSeriesAsProcessed(m.userID, m.seriesID, m.lbls, m.tableName))
}
require.Equal(t, tc.expSkipSeries, mgr.CanSkipSeries(tc.chunkEntry.userID, tc.chunkEntry.lbls, tc.chunkEntry.seriesID, 0, tc.chunkEntry.tableName, 0))
isExpired, _ := mgr.Expired(tc.chunkEntry.userID, tc.chunkEntry.chk, tc.chunkEntry.lbls, tc.chunkEntry.seriesID, tc.chunkEntry.tableName, 0)
require.Equal(t, tc.expExpired, isExpired)
// see if stopping the manager properly retains the progress and loads back when initialized
storedSeriesProgress := mgr.processedSeries
mgr.Stop()
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
// when the mark phase ends, series progress should get cleared
mgr.MarkPhaseFinished()
require.Len(t, mgr.processedSeries, 0)
require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename))
})
}
}
type storeAddReqDetails struct {
userID, query string
startTime, endTime model.Time

@ -23,13 +23,14 @@ type IntervalFilter struct {
}
type ExpirationChecker interface {
Expired(ref ChunkEntry, now model.Time) (bool, filter.Func)
Expired(userID []byte, chk Chunk, lbls labels.Labels, seriesID []byte, tableName string, now model.Time) (bool, filter.Func)
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
MarkPhaseStarted()
MarkPhaseFailed()
MarkPhaseTimedOut()
MarkPhaseFinished()
DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool
DropFromIndex(userID []byte, chk Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool
CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool
}
type expirationChecker struct {
@ -52,22 +53,22 @@ func NewExpirationChecker(limits Limits) ExpirationChecker {
}
// Expired tells if a ref chunk is expired based on retention rules.
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) {
userID := unsafeGetString(ref.UserID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
func (e *expirationChecker) Expired(userID []byte, chk Chunk, lbls labels.Labels, _ []byte, _ string, now model.Time) (bool, filter.Func) {
userIDStr := unsafeGetString(userID)
period := e.tenantsRetention.RetentionPeriodFor(userIDStr, lbls)
// The 0 value should disable retention
if period <= 0 {
return false, nil
}
return now.Sub(ref.Through) > period, nil
return now.Sub(chk.Through) > period, nil
}
// DropFromIndex tells if it is okay to drop the chunk entry from index table.
// We check if tableEndTime is out of retention period, calculated using the labels from the chunk.
// If the tableEndTime is out of retention then we can drop the chunk entry without removing the chunk from the store.
func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
userID := unsafeGetString(ref.UserID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
func (e *expirationChecker) DropFromIndex(userID []byte, _ Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool {
userIDStr := unsafeGetString(userID)
period := e.tenantsRetention.RetentionPeriodFor(userIDStr, labels)
// The 0 value should disable retention
if period <= 0 {
return false
@ -84,6 +85,16 @@ func (e *expirationChecker) MarkPhaseStarted() {
func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseTimedOut() {}
func (e *expirationChecker) MarkPhaseFinished() {}
func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, _ []byte, seriesStart model.Time, _ string, now model.Time) bool {
userIDStr := unsafeGetString(userID)
period := e.tenantsRetention.RetentionPeriodFor(userIDStr, lbls)
// The 0 value should disable retention
if period <= 0 {
return true
}
return now.Sub(seriesStart) < period
}
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
// when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime.
@ -109,7 +120,7 @@ func NeverExpiringExpirationChecker(_ Limits) ExpirationChecker {
type neverExpiringExpirationChecker struct{}
func (e *neverExpiringExpirationChecker) Expired(_ ChunkEntry, _ model.Time) (bool, filter.Func) {
func (e *neverExpiringExpirationChecker) Expired(_ []byte, _ Chunk, _ labels.Labels, _ []byte, _ string, _ model.Time) (bool, filter.Func) {
return false, nil
}
func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(_ model.Interval, _ string) bool {
@ -119,9 +130,12 @@ func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {}
func (e *neverExpiringExpirationChecker) MarkPhaseTimedOut() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {}
func (e *neverExpiringExpirationChecker) DropFromIndex(_ ChunkEntry, _ model.Time, _ model.Time) bool {
func (e *neverExpiringExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool {
return false
}
func (e *neverExpiringExpirationChecker) CanSkipSeries(_ []byte, _ labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool {
return true
}
type TenantsRetention struct {
limits Limits

@ -108,20 +108,22 @@ func Test_expirationChecker_Expired(t *testing.T) {
e := NewExpirationChecker(o)
tests := []struct {
name string
ref ChunkEntry
want bool
name string
userID string
labels string
chunk Chunk
want bool
}{
{"expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), true},
{"just expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-1*time.Hour+(10*time.Millisecond))), false},
{"not expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-30*time.Minute)), false},
{"not expired tenant by far", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-72*time.Hour), model.Now().Add(-3*time.Hour)), false},
{"expired stream override", newChunkEntry("2", `{foo="bar"}`, model.Now().Add(-12*time.Hour), model.Now().Add(-10*time.Hour)), true},
{"non expired stream override", newChunkEntry("1", `{foo="bar"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-90*time.Minute)), false},
{"expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, true},
{"just expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-1*time.Hour + (10 * time.Millisecond))}, false},
{"not expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-30 * time.Minute)}, false},
{"not expired tenant by far", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-72 * time.Hour), Through: model.Now().Add(-3 * time.Hour)}, false},
{"expired stream override", "2", `{foo="bar"}`, Chunk{From: model.Now().Add(-12 * time.Hour), Through: model.Now().Add(-10 * time.Hour)}, true},
{"non expired stream override", "1", `{foo="bar"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-90 * time.Minute)}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now())
actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now())
require.Equal(t, tt.want, actual)
require.Nil(t, nonDeletedIntervalFilters)
})
@ -183,18 +185,20 @@ func Test_expirationChecker_Expired_zeroValue(t *testing.T) {
require.NoError(t, err)
e := NewExpirationChecker(o)
tests := []struct {
name string
ref ChunkEntry
want bool
name string
userID string
labels string
chunk Chunk
want bool
}{
{"tenant with no override should not delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), false},
{"tenant with no override, REALLY old chunk should not delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-10000*time.Hour+(1*time.Hour)), model.Now().Add(-10000*time.Hour)), false},
{"tenant with override chunk less than retention should not delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), false},
{"tenant with override should delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), true},
{"tenant with no override should not delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, false},
{"tenant with no override, REALLY old chunk should not delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-10000*time.Hour + (1 * time.Hour)), Through: model.Now().Add(-10000 * time.Hour)}, false},
{"tenant with override chunk less than retention should not delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, false},
{"tenant with override should delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now())
actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now())
require.Equal(t, tt.want, actual)
require.Nil(t, nonDeletedIntervalFilters)
})
@ -229,17 +233,19 @@ func Test_expirationChecker_Expired_zeroValueOverride(t *testing.T) {
e := NewExpirationChecker(o)
tests := []struct {
name string
ref ChunkEntry
want bool
name string
userID string
labels string
chunk Chunk
want bool
}{
{"tenant with no override should delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), true},
{"tenant with override should not delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), false},
{"tenant with zero value without unit should not delete", newChunkEntry("3", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), false},
{"tenant with no override should delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, true},
{"tenant with override should not delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, false},
{"tenant with zero value without unit should not delete", "3", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now())
actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now())
require.Equal(t, tt.want, actual)
require.Nil(t, nonDeletedIntervalFilters)
})
@ -267,17 +273,55 @@ func Test_expirationChecker_DropFromIndex_zeroValue(t *testing.T) {
chunkThrough := model.Now().Add(-2 * time.Hour)
tests := []struct {
name string
ref ChunkEntry
userID string
labels string
chunk Chunk
tableEndTime model.Time
want bool
}{
{"tenant with no override should not delete", newChunkEntry("1", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-48 * time.Hour), false},
{"tenant with override tableEndTime within retention period should not delete", newChunkEntry("2", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-1 * time.Hour), false},
{"tenant with override should delete", newChunkEntry("2", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-48 * time.Hour), true},
{"tenant with no override should not delete", "1", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-48 * time.Hour), false},
{"tenant with override tableEndTime within retention period should not delete", "2", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-1 * time.Hour), false},
{"tenant with override should delete", "2", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-48 * time.Hour), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := e.DropFromIndex(tt.ref, tt.tableEndTime, model.Now())
actual := e.DropFromIndex([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), tt.tableEndTime, model.Now())
require.Equal(t, tt.want, actual)
})
}
}
func Test_expirationChecker_CanSkipSeries(t *testing.T) {
// Default retention should be zero
d := defaultLimitsTestConfig()
// Override tenant 2 to have 24 hour retention
tl := defaultLimitsTestConfig()
oneDay, _ := model.ParseDuration("24h")
tl.RetentionPeriod = oneDay
f := fakeOverrides{
tenantLimits: map[string]*validation.Limits{
"2": &tl,
},
}
o, err := overridesTestConfig(d, f)
require.NoError(t, err)
e := NewExpirationChecker(o)
tests := []struct {
name string
userID string
labels string
seriesStart model.Time
want bool
}{
{"tenant with no override should skip series", "1", `{foo="buzz"}`, model.Now().Add(-48 * time.Hour), true},
{"tenant with override, seriesStart within retention period should skip series", "2", `{foo="buzz"}`, model.Now().Add(-1 * time.Hour), true},
{"tenant with override, seriesStart outside retention period should not skip series", "2", `{foo="buzz"}`, model.Now().Add(-48 * time.Hour), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := e.CanSkipSeries([]byte(tt.userID), mustParseLabels(tt.labels), nil, tt.seriesStart, "", model.Now())
require.Equal(t, tt.want, actual)
})
}

@ -31,30 +31,57 @@ const (
MarkersFolder = "markers"
)
type ChunkRef struct {
UserID []byte
SeriesID []byte
ChunkID []byte
From model.Time
Through model.Time
type Chunk struct {
ChunkID []byte
From model.Time
Through model.Time
}
func (c ChunkRef) String() string {
return fmt.Sprintf("UserID: %s , SeriesID: %s , Time: [%s,%s]", c.UserID, c.SeriesID, c.From, c.Through)
func (c Chunk) String() string {
return fmt.Sprintf("ChunkID: %s", c.ChunkID)
}
type ChunkEntry struct {
ChunkRef
Labels labels.Labels
type Series struct {
seriesID, userID []byte
labels labels.Labels
chunks []Chunk
}
type ChunkEntryCallback func(ChunkEntry) (deleteChunk bool, err error)
func (s *Series) SeriesID() []byte {
return s.seriesID
}
func (s *Series) UserID() []byte {
return s.userID
}
func (s *Series) Labels() labels.Labels {
return s.labels
}
func (s *Series) Chunks() []Chunk {
return s.chunks
}
type ChunkIterator interface {
ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error
func (s *Series) Reset(seriesID, userID []byte, labels labels.Labels) {
s.seriesID = seriesID
s.userID = userID
s.labels = labels
s.chunks = s.chunks[:0]
}
type SeriesCleaner interface {
func (s *Series) AppendChunks(ref ...Chunk) {
s.chunks = append(s.chunks, ref...)
}
type SeriesCallback func(series Series) (err error)
type SeriesIterator interface {
ForEachSeries(ctx context.Context, callback SeriesCallback) error
}
type IndexCleaner interface {
RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error
// CleanupSeries is for cleaning up the series that do have any chunks left in the index.
// It would only be called for the series that have all their chunks deleted without adding new ones.
CleanupSeries(userID []byte, lbls labels.Labels) error
@ -70,9 +97,9 @@ type chunkIndexer interface {
}
type IndexProcessor interface {
ChunkIterator
SeriesIterator
chunkIndexer
SeriesCleaner
IndexCleaner
}
var errNoChunksFound = errors.New("no chunks found in table, please check if there are really no chunks and manually drop the table or " +
@ -176,57 +203,84 @@ func markForDelete(
iterCtx, cancel := ctxForTimeout(timeout)
defer cancel()
err := indexFile.ForEachChunk(iterCtx, func(c ChunkEntry) (bool, error) {
err := indexFile.ForEachSeries(iterCtx, func(s Series) error {
chunks := s.Chunks()
if len(chunks) == 0 {
// add the series to series map so that it gets cleaned up
seriesMap.Add(s.SeriesID(), s.UserID(), s.Labels())
return nil
}
chunksFound = true
seriesMap.Add(c.SeriesID, c.UserID, c.Labels)
// see if the chunk is deleted completely or partially
if expired, filterFunc := expiration.Expired(c, now); expired {
linesDeleted := true // tracks whether we deleted at least some data from the chunk
if filterFunc != nil {
wroteChunks := false
var err error
wroteChunks, linesDeleted, err = chunkRewriter.rewriteChunk(ctx, c, tableInterval, filterFunc)
if err != nil {
return false, fmt.Errorf("failed to rewrite chunk %s with error %s", c.ChunkID, err)
}
seriesStart := chunks[0].From
for i := 0; i < len(chunks); i++ {
if chunks[i].From < seriesStart {
seriesStart = chunks[i].From
}
}
if expiration.CanSkipSeries(s.UserID(), s.labels, s.SeriesID(), seriesStart, tableName, now) {
empty = false
return nil
}
seriesMap.Add(s.SeriesID(), s.UserID(), s.Labels())
for i := 0; i < len(chunks) && iterCtx.Err() == nil; i++ {
c := chunks[i]
// see if the chunk is deleted completely or partially
if expired, filterFunc := expiration.Expired(s.UserID(), c, s.Labels(), s.SeriesID(), tableName, now); expired {
linesDeleted := true // tracks whether we deleted at least some data from the chunk
if filterFunc != nil {
wroteChunks := false
var err error
wroteChunks, linesDeleted, err = chunkRewriter.rewriteChunk(ctx, s.UserID(), c, tableInterval, filterFunc)
if err != nil {
return fmt.Errorf("failed to rewrite chunk %s with error %s", c.ChunkID, err)
}
if wroteChunks {
// we have re-written chunk to the storage so the table won't be empty and the series are still being referred.
empty = false
seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
if wroteChunks {
// we have re-written chunk to the storage so the table won't be empty and the series are still being referred.
empty = false
seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID())
}
}
}
if linesDeleted {
modified = true
if linesDeleted {
modified = true
// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
// the retention would fail because it would fail to find it in the storage.
if filterFunc == nil || c.From >= tableInterval.Start {
if err := marker.Put(c.ChunkID); err != nil {
return false, err
// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
// the retention would fail because it would fail to find it in the storage.
if filterFunc == nil || c.From >= tableInterval.Start {
if err := marker.Put(c.ChunkID); err != nil {
return err
}
}
if err := indexFile.RemoveChunk(c.From, c.Through, s.UserID(), s.Labels(), c.ChunkID); err != nil {
return fmt.Errorf("failed to remove chunk %s from index with error %s", c.ChunkID, err)
}
continue
}
return true, nil
}
}
// The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval.
// If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table.
// We would now check if the end time of the tableInterval is out of retention period so that
// we can drop the chunk entry from this table without removing the chunk from the store.
if c.Through.After(tableInterval.End) {
if expiration.DropFromIndex(c, tableInterval.End, now) {
modified = true
return true, nil
// The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval.
// If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table.
// We would now check if the end time of the tableInterval is out of retention period so that
// we can drop the chunk entry from this table without removing the chunk from the store.
if c.Through.After(tableInterval.End) {
if expiration.DropFromIndex(s.UserID(), c, nil, tableInterval.End, now) {
modified = true
if err := indexFile.RemoveChunk(c.From, c.Through, s.UserID(), s.Labels(), c.ChunkID); err != nil {
return fmt.Errorf("failed to remove chunk %s from index with error %s", c.ChunkID, err)
}
continue
}
}
}
empty = false
seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
return false, nil
empty = false
seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID())
}
return iterCtx.Err()
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) {
@ -366,11 +420,11 @@ func newChunkRewriter(chunkClient client.Client, tableName string, chunkIndexer
// If the newChunk is different, linesDeleted would be true.
// The newChunk is indexed and uploaded only if it belongs to the current index table being processed,
// the status of which is set to wroteChunks.
func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) {
userID := unsafeGetString(ce.UserID)
func (c *chunkRewriter) rewriteChunk(ctx context.Context, userID []byte, ce Chunk, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) {
userIDStr := unsafeGetString(userID)
chunkID := unsafeGetString(ce.ChunkID)
chk, err := chunk.ParseExternalKey(userID, chunkID)
chk, err := chunk.ParseExternalKey(userIDStr, chunkID)
if err != nil {
return false, false, err
}
@ -381,7 +435,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn
}
if len(chks) != 1 {
return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks))
return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", ce.ChunkID, len(chks))
}
newChunkData, err := chks[0].Data.Rebound(ce.From, ce.Through, func(ts time.Time, s string, structuredMetadata ...labels.Label) bool {
@ -394,7 +448,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn
})
if err != nil {
if errors.Is(err, chunk.ErrSliceNoDataInRange) {
level.Info(util_log.Logger).Log("msg", "Delete request filterFunc leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID))
level.Info(util_log.Logger).Log("msg", "Delete request filterFunc leaves an empty chunk", "chunk ref", string(ce.ChunkID))
return false, true, nil
}
return false, false, err
@ -418,7 +472,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn
}
newChunk := chunk.NewChunk(
userID, chks[0].FingerprintModel(), chks[0].Metric,
userIDStr, chks[0].FingerprintModel(), chks[0].Metric,
facade,
newChunkStart,
newChunkEnd,

@ -259,10 +259,18 @@ func Test_EmptyTable(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, 1)
// disabled retention should not do anything to the table
empty, modified, err := markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger)
require.NoError(t, err)
require.False(t, empty)
require.False(t, modified)
// Set a very low retention to make sure all chunks are marked for deletion which will create an empty table.
empty, _, err := markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: time.Second}, "2": {retentionPeriod: time.Second}}}), nil, util_log.Logger)
empty, modified, err = markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: time.Second}, "2": {retentionPeriod: time.Second}}}), nil, util_log.Logger)
require.NoError(t, err)
require.True(t, empty)
require.True(t, modified)
_, _, err = markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger)
require.Equal(t, err, errNoChunksFound)
@ -575,7 +583,11 @@ func TestChunkRewriter(t *testing.T) {
for _, indexTable := range indexTables {
cr := newChunkRewriter(store.chunkClient, indexTable.name, indexTable)
wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), entryFromChunk(tt.chunk), ExtractIntervalFromTableName(indexTable.name), tt.filterFunc)
wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), []byte(tt.chunk.UserID), Chunk{
ChunkID: []byte(getChunkID(tt.chunk.ChunkRef)),
From: tt.chunk.From,
Through: tt.chunk.Through,
}, ExtractIntervalFromTableName(indexTable.name), tt.filterFunc)
require.NoError(t, err)
require.Equal(t, tt.expectedRespByTables[indexTable.name].mustDeleteLines, linesDeleted)
require.Equal(t, tt.expectedRespByTables[indexTable.name].mustRewriteChunk, wroteChunks)
@ -657,25 +669,26 @@ type chunkExpiry struct {
type mockExpirationChecker struct {
ExpirationChecker
chunksExpiry map[string]chunkExpiry
delay time.Duration
calls int
timedOut bool
chunksExpiry map[string]chunkExpiry
skipSeries map[string]bool
delay time.Duration
numExpiryChecks int
timedOut bool
}
func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) *mockExpirationChecker {
return &mockExpirationChecker{chunksExpiry: chunksExpiry}
func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry, skipSeries map[string]bool) *mockExpirationChecker {
return &mockExpirationChecker{chunksExpiry: chunksExpiry, skipSeries: skipSeries}
}
func (m *mockExpirationChecker) Expired(ref ChunkEntry, _ model.Time) (bool, filter.Func) {
func (m *mockExpirationChecker) Expired(_ []byte, chk Chunk, _ labels.Labels, _ []byte, _ string, _ model.Time) (bool, filter.Func) {
time.Sleep(m.delay)
m.calls++
m.numExpiryChecks++
ce := m.chunksExpiry[string(ref.ChunkID)]
ce := m.chunksExpiry[string(chk.ChunkID)]
return ce.isExpired, ce.filterFunc
}
func (m *mockExpirationChecker) DropFromIndex(_ ChunkEntry, _ model.Time, _ model.Time) bool {
func (m *mockExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool {
return false
}
@ -683,6 +696,10 @@ func (m *mockExpirationChecker) MarkPhaseTimedOut() {
m.timedOut = true
}
func (m *mockExpirationChecker) CanSkipSeries(_ []byte, lbls labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool {
return m.skipSeries[lbls.String()]
}
func TestMarkForDelete_SeriesCleanup(t *testing.T) {
now := model.Now()
schema := allSchemas[2]
@ -690,13 +707,15 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now))
for _, tc := range []struct {
name string
chunks []chunk.Chunk
expiry []chunkExpiry
expectedDeletedSeries []map[uint64]struct{}
expectedEmpty []bool
expectedModified []bool
numChunksDeleted []int64
name string
chunks []chunk.Chunk
expiry []chunkExpiry
skipSeries map[string]bool
expectedDeletedSeries []map[uint64]struct{}
expectedEmpty []bool
expectedModified []bool
numChunksDeleted []int64
numExpectedExpiryChecks int
}{
{
name: "no chunk and series deleted",
@ -720,6 +739,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
0,
},
numExpectedExpiryChecks: 1,
},
{
name: "chunk deleted with filter but no lines matching",
@ -746,6 +766,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
0,
},
numExpectedExpiryChecks: 1,
},
{
name: "only one chunk in store which gets deleted",
@ -769,6 +790,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1,
},
numExpectedExpiryChecks: 1,
},
{
name: "only one chunk in store which gets partially deleted",
@ -800,6 +822,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1,
},
numExpectedExpiryChecks: 1,
},
{
name: "one of two chunks deleted",
@ -827,6 +850,65 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1,
},
numExpectedExpiryChecks: 2,
},
{
name: "one of two series skipped",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)),
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)),
},
skipSeries: map[string]bool{`{foo="1"}`: true},
expiry: []chunkExpiry{
{
isExpired: false,
},
{
isExpired: true,
},
},
expectedDeletedSeries: []map[uint64]struct{}{
{labels.Labels{labels.Label{Name: "foo", Value: "2"}}.Hash(): struct{}{}},
},
expectedEmpty: []bool{
false,
},
expectedModified: []bool{
true,
},
numChunksDeleted: []int64{
1,
},
numExpectedExpiryChecks: 1,
},
{
name: "all series skipped",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)),
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)),
},
skipSeries: map[string]bool{`{foo="1"}`: true, `{foo="2"}`: true},
expiry: []chunkExpiry{
{
isExpired: false,
},
{
isExpired: false,
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
false,
},
expectedModified: []bool{
false,
},
numChunksDeleted: []int64{
0,
},
numExpectedExpiryChecks: 0,
},
{
name: "one of two chunks partially deleted",
@ -862,6 +944,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1,
},
numExpectedExpiryChecks: 2,
},
{
name: "one big chunk partially deleted for yesterdays table without rewrite",
@ -888,6 +971,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1, 0,
},
numExpectedExpiryChecks: 2,
},
{
name: "one big chunk partially deleted for yesterdays table with rewrite",
@ -914,6 +998,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
numChunksDeleted: []int64{
1, 0,
},
numExpectedExpiryChecks: 2,
},
} {
t.Run(tc.name, func(t *testing.T) {
@ -925,7 +1010,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
chunksExpiry[getChunkID(chunk.ChunkRef)] = tc.expiry[i]
}
expirationChecker := newMockExpirationChecker(chunksExpiry)
expirationChecker := newMockExpirationChecker(chunksExpiry, tc.skipSeries)
store.Stop()
@ -945,6 +1030,8 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
require.EqualValues(t, tc.expectedDeletedSeries[i], seriesCleanRecorder.deletedSeries[userID])
}
require.Equal(t, tc.numExpectedExpiryChecks, expirationChecker.numExpiryChecks)
})
}
}
@ -967,7 +1054,7 @@ func TestDeleteTimeout(t *testing.T) {
require.NoError(t, store.Put(context.TODO(), chunks))
store.Stop()
expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{})
expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{}, nil)
expirationChecker.delay = 10 * time.Millisecond
table := store.indexTables()[0]
@ -985,7 +1072,7 @@ func TestDeleteTimeout(t *testing.T) {
require.NoError(t, err)
require.False(t, empty)
require.False(t, isModified)
require.Equal(t, tc.calls, expirationChecker.calls)
require.Equal(t, tc.calls, expirationChecker.numExpiryChecks)
require.Equal(t, tc.timedOut, expirationChecker.timedOut)
}
}

@ -9,7 +9,7 @@ type userSeries struct {
seriesIDLen int
}
func newUserSeries(seriesID []byte, userID []byte) userSeries {
func newUserSeries(seriesID, userID []byte) userSeries {
key := make([]byte, 0, len(seriesID)+len(userID))
key = append(key, seriesID...)
key = append(key, userID...)
@ -31,16 +31,6 @@ func (us userSeries) UserID() []byte {
return us.key[us.seriesIDLen:]
}
func (us *userSeries) Reset(seriesID []byte, userID []byte) {
if us.key == nil {
us.key = make([]byte, 0, len(seriesID)+len(userID))
}
us.key = us.key[:0]
us.key = append(us.key, seriesID...)
us.key = append(us.key, userID...)
us.seriesIDLen = len(seriesID)
}
type userSeriesInfo struct {
userSeries
isDeleted bool
@ -53,7 +43,7 @@ func newUserSeriesMap() userSeriesMap {
return make(userSeriesMap)
}
func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) {
func (u userSeriesMap) Add(seriesID, userID []byte, lbls labels.Labels) {
us := newUserSeries(seriesID, userID)
if _, ok := u[us.Key()]; ok {
return
@ -67,7 +57,7 @@ func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) {
}
// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store
func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) {
func (u userSeriesMap) MarkSeriesNotDeleted(seriesID, userID []byte) {
us := newUserSeries(seriesID, userID)
usi := u[us.Key()]
usi.isDeleted = false

@ -117,51 +117,50 @@ var (
sweepMetrics = newSweeperMetrics(prometheus.DefaultRegisterer)
)
func newChunkEntry(userID, labels string, from, through model.Time) ChunkEntry {
func mustParseLabels(labels string) labels.Labels {
lbs, err := syntax.ParseLabels(labels)
if err != nil {
panic(err)
}
return ChunkEntry{
ChunkRef: ChunkRef{
UserID: []byte(userID),
SeriesID: labelsSeriesID(lbs),
From: from,
Through: through,
},
Labels: lbs,
}
return lbs
}
type table struct {
name string
chunks map[string][]chunk.Chunk
chunks map[string]map[string][]chunk.Chunk
}
func (t *table) ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error {
for userID, chks := range t.chunks {
i := 0
for j := 0; j < len(chks) && ctx.Err() == nil; j++ {
chk := chks[j]
deleteChunk, err := callback(entryFromChunk(chk))
if err != nil {
return err
func (t *table) ForEachSeries(ctx context.Context, callback SeriesCallback) error {
for userID := range t.chunks {
for seriesID := range t.chunks[userID] {
chunks := make([]Chunk, 0, len(t.chunks[userID][seriesID]))
for _, chk := range t.chunks[userID][seriesID] {
chunks = append(chunks, Chunk{
ChunkID: []byte(getChunkID(chk.ChunkRef)),
From: chk.From,
Through: chk.Through,
})
}
if !deleteChunk {
t.chunks[userID][i] = chk
i++
series := Series{}
series.Reset(
[]byte(seriesID),
[]byte(userID),
labels.NewBuilder(t.chunks[userID][seriesID][0].Metric).Del(labels.MetricName).Labels(),
)
series.AppendChunks(chunks...)
if err := callback(series); err != nil {
return err
}
}
t.chunks[userID] = t.chunks[userID][:i]
}
return ctx.Err()
}
func (t *table) IndexChunk(chunk chunk.Chunk) (bool, error) {
t.chunks[chunk.UserID] = append(t.chunks[chunk.UserID], chunk)
seriesID := string(labelsSeriesID(chunk.Metric))
t.chunks[chunk.UserID][seriesID] = append(t.chunks[chunk.UserID][seriesID], chunk)
return true, nil
}
@ -169,19 +168,34 @@ func (t *table) CleanupSeries(_ []byte, _ labels.Labels) error {
return nil
}
func (t *table) RemoveChunk(_, _ model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error {
seriesID := string(labelsSeriesID(labels.NewBuilder(lbls).Set(labels.MetricName, "logs").Labels()))
for i, chk := range t.chunks[string(userID)][seriesID] {
if getChunkID(chk.ChunkRef) == string(chunkID) {
t.chunks[string(userID)][seriesID] = append(t.chunks[string(userID)][seriesID][:i], t.chunks[string(userID)][seriesID][i+1:]...)
}
}
return nil
}
func newTable(name string) *table {
return &table{
name: name,
chunks: map[string][]chunk.Chunk{},
chunks: map[string]map[string][]chunk.Chunk{},
}
}
func (t *table) Put(chk chunk.Chunk) {
if _, ok := t.chunks[chk.UserID]; !ok {
t.chunks[chk.UserID] = []chunk.Chunk{}
t.chunks[chk.UserID] = make(map[string][]chunk.Chunk)
}
seriesID := string(labelsSeriesID(chk.Metric))
if _, ok := t.chunks[chk.UserID][seriesID]; !ok {
t.chunks[chk.UserID][seriesID] = []chunk.Chunk{}
}
t.chunks[chk.UserID] = append(t.chunks[chk.UserID], chk)
t.chunks[chk.UserID][seriesID] = append(t.chunks[chk.UserID][seriesID], chk)
}
func (t *table) GetChunks(userID string, from, through model.Time, metric labels.Labels) []chunk.Chunk {
@ -191,11 +205,13 @@ func (t *table) GetChunks(userID string, from, through model.Time, metric labels
matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value))
}
for _, chk := range t.chunks[userID] {
if chk.From > through || chk.Through < from || !allMatch(matchers, chk.Metric) {
continue
for seriesID := range t.chunks[userID] {
for _, chk := range t.chunks[userID][seriesID] {
if chk.From > through || chk.Through < from || !allMatch(matchers, chk.Metric) {
continue
}
chunks = append(chunks, chk)
}
chunks = append(chunks, chk)
}
return chunks
@ -311,19 +327,6 @@ func (t *testStore) GetChunks(userID string, from, through model.Time, metric la
return fetchedChunk
}
func entryFromChunk(c chunk.Chunk) ChunkEntry {
return ChunkEntry{
ChunkRef: ChunkRef{
UserID: []byte(c.UserID),
SeriesID: labelsSeriesID(c.Metric),
ChunkID: []byte(getChunkID(c.ChunkRef)),
From: c.From,
Through: c.Through,
},
Labels: labels.NewBuilder(c.Metric).Del(labels.MetricName).Labels(),
}
}
func getChunkID(c logproto.ChunkRef) string {
return schemaCfg.ExternalKey(c)
}

@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/klauspost/compress/gzip"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@ -159,7 +160,7 @@ func openCompactedIndex(path string) (*compactedIndex, error) {
return &compactedIndex{indexFile: idxFile}, nil
}
func (c compactedIndex) ForEachChunk(_ context.Context, _ retention.ChunkEntryCallback) error {
func (c compactedIndex) ForEachSeries(_ context.Context, _ retention.SeriesCallback) error {
return nil
}
@ -171,6 +172,10 @@ func (c compactedIndex) CleanupSeries(_ []byte, _ labels.Labels) error {
return nil
}
func (c compactedIndex) RemoveChunk(_, _ model.Time, _ []byte, _ labels.Labels, _ []byte) error {
return nil
}
func (c compactedIndex) Cleanup() {
_ = c.indexFile.Close()
}

@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.etcd.io/bbolt"
@ -136,7 +137,7 @@ func (c *CompactedIndex) setupIndexProcessors() error {
return nil
}
func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
func (c *CompactedIndex) ForEachSeries(ctx context.Context, callback retention.SeriesCallback) error {
if err := c.setupIndexProcessors(); err != nil {
return err
}
@ -146,7 +147,7 @@ func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.Ch
return fmt.Errorf("required boltdb bucket not found")
}
return ForEachChunk(ctx, bucket, c.periodConfig, callback)
return ForEachSeries(ctx, bucket, c.periodConfig, callback)
}
func (c *CompactedIndex) IndexChunk(chunk chunk.Chunk) (bool, error) {
@ -165,6 +166,14 @@ func (c *CompactedIndex) CleanupSeries(userID []byte, lbls labels.Labels) error
return c.seriesCleaner.CleanupSeries(userID, lbls)
}
func (c *CompactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error {
if err := c.setupIndexProcessors(); err != nil {
return err
}
return c.seriesCleaner.RemoveChunk(from, through, userID, labels, chunkID)
}
func (c *CompactedIndex) ToIndexFile() (shipperindex.Index, error) {
if c.boltdbTx != nil {
err := c.boltdbTx.Commit()

@ -47,18 +47,23 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) {
// remove c1, c2 chunk and index c4 with same labels as c2
c4 := createChunk(t, chunkfmt, headfmt, "2", labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "fizz", Value: "buzz"}}, tt.from, tt.from.Add(30*time.Minute))
err = compactedIndex.ForEachChunk(context.Background(), func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
if entry.Labels.Get("fizz") == "buzz" {
err = compactedIndex.ForEachSeries(context.Background(), func(series retention.Series) (err error) {
if series.Labels().Get("fizz") == "buzz" {
chunkIndexed, err := compactedIndex.IndexChunk(c4)
require.NoError(t, err)
require.True(t, chunkIndexed)
}
return entry.Labels.Get("foo") == "bar", nil
if series.Labels().Get("foo") == "bar" {
for _, chk := range series.Chunks() {
require.NoError(t, compactedIndex.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID))
}
}
return nil
})
require.NoError(t, err)
// remove series for c1 since all its chunks are deleted
err = compactedIndex.CleanupSeries(entryFromChunk(testSchema, c1).UserID, c1.Metric)
err = compactedIndex.CleanupSeries([]byte(c1.UserID), c1.Metric)
require.NoError(t, err)
indexFile, err := compactedIndex.ToIndexFile()
@ -74,7 +79,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) {
err = modifiedBoltDB.View(func(tx *bbolt.Tx) error {
return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error {
c1SeriesID := entryFromChunk(testSchema, c1).SeriesID
c1SeriesID := labelsSeriesID(c1.Metric)
series, ok, err := parseLabelIndexSeriesID(decodeKey(k))
if !ok {
return nil
@ -92,15 +97,15 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) {
})
require.NoError(t, err)
expectedChunkEntries := []retention.ChunkEntry{
entryFromChunk(testSchema, c3),
entryFromChunk(testSchema, c4),
expectedChunkEntries := []retention.Chunk{
retentionChunkFromChunk(testSchema, c3),
retentionChunkFromChunk(testSchema, c4),
}
chunkEntriesFound := []retention.ChunkEntry{}
var chunkEntriesFound []retention.Chunk
err = modifiedBoltDB.View(func(tx *bbolt.Tx) error {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
chunkEntriesFound = append(chunkEntriesFound, entry)
return false, nil
return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) {
chunkEntriesFound = append(chunkEntriesFound, series.Chunks()...)
return nil
})
})
require.NoError(t, err)

@ -7,8 +7,6 @@ import (
"strconv"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/compactor/retention"
)
const (
@ -39,36 +37,44 @@ func (e InvalidIndexKeyError) Is(target error) bool {
return target == ErrInvalidIndexKey
}
func parseChunkRef(hashKey, rangeKey []byte) (retention.ChunkRef, bool, error) {
type chunkRef struct {
UserID []byte
SeriesID []byte
ChunkID []byte
From model.Time
Through model.Time
}
func parseChunkRef(hashKey, rangeKey []byte) (chunkRef, bool, error) {
componentsRef := getComponents()
defer putComponents(componentsRef)
components := componentsRef.components
components = decodeRangeKey(rangeKey, components)
if len(components) == 0 {
return retention.ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
return chunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
}
keyType := components[len(components)-1]
if len(keyType) == 0 || keyType[0] != chunkTimeRangeKeyV3 {
return retention.ChunkRef{}, false, nil
return chunkRef{}, false, nil
}
chunkID := components[len(components)-2]
userID, hexFrom, hexThrough, ok := parseChunkID(chunkID)
if !ok {
return retention.ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
return chunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey)
}
from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64)
if err != nil {
return retention.ChunkRef{}, false, err
return chunkRef{}, false, err
}
through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64)
if err != nil {
return retention.ChunkRef{}, false, err
return chunkRef{}, false, err
}
return retention.ChunkRef{
return chunkRef{
UserID: userID,
SeriesID: seriesFromHash(hashKey),
From: model.Time(from),

@ -1,6 +1,7 @@
package compactor
import (
"bytes"
"context"
"fmt"
@ -19,17 +20,17 @@ const (
)
var (
_ retention.SeriesCleaner = &seriesCleaner{}
_ retention.IndexCleaner = &seriesCleaner{}
)
func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error {
func ForEachSeries(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.SeriesCallback) error {
labelsMapper, err := newSeriesLabelsMapper(bucket, config)
if err != nil {
return err
}
cursor := bucket.Cursor()
var current retention.ChunkEntry
var current retention.Series
for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() {
ref, ok, err := parseChunkRef(decodeKey(key))
@ -40,17 +41,32 @@ func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.Perio
if !ok {
continue
}
current.ChunkRef = ref
current.Labels = labelsMapper.Get(ref.SeriesID, ref.UserID)
deleteChunk, err := callback(current)
if err != nil {
return err
}
if deleteChunk {
if err := cursor.Delete(); err != nil {
if len(current.Chunks()) == 0 {
current.Reset(ref.SeriesID, ref.UserID, labelsMapper.Get(ref.SeriesID, ref.UserID))
} else if bytes.Compare(current.UserID(), ref.UserID) != 0 || bytes.Compare(current.SeriesID(), ref.SeriesID) != 0 {
err = callback(current)
if err != nil {
return err
}
current.Reset(ref.SeriesID, ref.UserID, labelsMapper.Get(ref.SeriesID, ref.UserID))
}
current.AppendChunks(retention.Chunk{
ChunkID: ref.ChunkID,
From: ref.From,
Through: ref.Through,
})
}
if ctx.Err() != nil {
return ctx.Err()
}
if len(current.Chunks()) != 0 {
err = callback(current)
if err != nil {
return err
}
}
@ -117,3 +133,32 @@ func (s *seriesCleaner) CleanupSeries(userID []byte, lbls labels.Labels) error {
return nil
}
func (s *seriesCleaner) RemoveChunk(from, through model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error {
// We need to add metric name label as well if it is missing since the series ids are calculated including that.
if lbls.Get(labels.MetricName) == "" {
lbls = append(lbls, labels.Label{
Name: labels.MetricName,
Value: logMetricName,
})
}
indexEntries, err := s.schema.GetChunkWriteEntries(from, through, string(userID), logMetricName, lbls, string(chunkID))
if err != nil {
return err
}
for _, indexEntry := range indexEntries {
key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue))
key = append(key, []byte(indexEntry.HashValue)...)
key = append(key, []byte(separator)...)
key = append(key, indexEntry.RangeValue...)
err := s.bucket.Delete(key)
if err != nil {
return err
}
}
return nil
}

@ -42,30 +42,34 @@ func Test_ChunkIterator(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, 1)
var actual []retention.ChunkEntry
var actual []retention.Chunk
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
return len(actual) == 2, nil
seriesCleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name)
return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) {
actual = append(actual, series.Chunks()...)
if string(series.UserID()) == c2.UserID {
return seriesCleaner.RemoveChunk(actual[1].From, actual[1].Through, series.UserID(), series.Labels(), actual[1].ChunkID)
}
return nil
})
})
require.NoError(t, err)
require.Equal(t, []retention.ChunkEntry{
entryFromChunk(store.schemaCfg, c1),
entryFromChunk(store.schemaCfg, c2),
require.Equal(t, []retention.Chunk{
retentionChunkFromChunk(store.schemaCfg, c1),
retentionChunkFromChunk(store.schemaCfg, c2),
}, actual)
// second pass we delete c2
actual = actual[:0]
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
return false, nil
return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) {
actual = append(actual, series.Chunks()...)
return nil
})
})
require.NoError(t, err)
require.Equal(t, []retention.ChunkEntry{
entryFromChunk(store.schemaCfg, c1),
require.Equal(t, []retention.Chunk{
retentionChunkFromChunk(store.schemaCfg, c1),
}, actual)
})
}
@ -92,12 +96,12 @@ func Test_ChunkIteratorContextCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var actual []retention.ChunkEntry
var actual []retention.Chunk
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
return ForEachSeries(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(series retention.Series) (err error) {
actual = append(actual, series.Chunks()...)
cancel()
return len(actual) == 2, nil
return nil
})
})
@ -110,7 +114,6 @@ func Test_SeriesCleaner(t *testing.T) {
t.Run(tt.schema, func(t *testing.T) {
cm := storage.NewClientMetrics()
defer cm.Unregister()
testSchema := config.SchemaConfig{Configs: []config.PeriodConfig{tt.config}}
store := newTestStore(t, cm)
chunkfmt, headfmt, err := tt.config.ChunkFormat()
require.NoError(t, err)
@ -129,27 +132,33 @@ func Test_SeriesCleaner(t *testing.T) {
require.Len(t, tables, 1)
// remove c1, c2 chunk
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return entry.Labels.Get("bar") == "foo", nil
seriesCleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name)
return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) {
if series.Labels().Get("bar") == "foo" {
for _, chk := range series.Chunks() {
require.NoError(t, seriesCleaner.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID))
}
}
return nil
})
})
require.NoError(t, err)
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
cleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name)
if err := cleaner.CleanupSeries(entryFromChunk(testSchema, c2).UserID, c2.Metric); err != nil {
if err := cleaner.CleanupSeries([]byte(c2.UserID), c2.Metric); err != nil {
return err
}
// remove series for c1 without __name__ label, which should work just fine
return cleaner.CleanupSeries(entryFromChunk(testSchema, c1).UserID, labels.NewBuilder(c1.Metric).Del(labels.MetricName).Labels())
return cleaner.CleanupSeries([]byte(c1.UserID), labels.NewBuilder(c1.Metric).Del(labels.MetricName).Labels())
})
require.NoError(t, err)
err = tables[0].DB.View(func(tx *bbolt.Tx) error {
return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error {
c1SeriesID := entryFromChunk(testSchema, c1).SeriesID
c2SeriesID := entryFromChunk(testSchema, c2).SeriesID
c1SeriesID := labelsSeriesID(c1.Metric)
c2SeriesID := labelsSeriesID(c2.Metric)
series, ok, err := parseLabelIndexSeriesID(decodeKey(k))
if !ok {
return nil
@ -215,21 +224,14 @@ func labelsString(ls labels.Labels) string {
return b.String()
}
func entryFromChunk(s config.SchemaConfig, c chunk.Chunk) retention.ChunkEntry {
return retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(c.UserID),
SeriesID: labelsSeriesID(c.Metric),
ChunkID: []byte(s.ExternalKey(c.ChunkRef)),
From: c.From,
Through: c.Through,
},
Labels: labels.NewBuilder(c.Metric).Del(labels.MetricName).Labels(),
func retentionChunkFromChunk(s config.SchemaConfig, c chunk.Chunk) retention.Chunk {
return retention.Chunk{
ChunkID: []byte(s.ExternalKey(c.ChunkRef)),
From: c.From,
Through: c.Through,
}
}
var chunkEntry retention.ChunkEntry
func Benchmark_ChunkIterator(b *testing.B) {
cm := storage.NewClientMetrics()
defer cm.Unregister()
@ -249,14 +251,13 @@ func Benchmark_ChunkIterator(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
var total int64
var total int
_ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(local.IndexBucketName)
for n := 0; n < b.N; n++ {
err := ForEachChunk(context.Background(), bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
chunkEntry = entry
total++
return true, nil
err := ForEachSeries(context.Background(), bucket, allSchemas[0].config, func(series retention.Series) (err error) {
total += len(series.Chunks())
return nil
})
require.NoError(b, err)
}

@ -22,15 +22,15 @@ func newUserSeries(seriesID []byte, userID []byte) userSeries {
}
}
func (us userSeries) Key() string {
func (us *userSeries) Key() string {
return unsafeGetString(us.key)
}
func (us userSeries) SeriesID() []byte {
func (us *userSeries) SeriesID() []byte {
return us.key[:us.seriesIDLen]
}
func (us userSeries) UserID() []byte {
func (us *userSeries) UserID() []byte {
return us.key[us.seriesIDLen:]
}

@ -288,24 +288,23 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string
}
}
// ForEachChunk iterates over all the chunks in the builder and calls the callback function.
func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
// ForEachSeries iterates over all the chunks in the builder and calls the callback function.
func (c *compactedIndex) ForEachSeries(ctx context.Context, callback retention.SeriesCallback) error {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{c.periodConfig},
}
chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: getUnsafeBytes(c.userID),
},
}
logprotoChunkRef := logproto.ChunkRef{
UserID: c.userID,
}
var series retention.Series
for seriesID, stream := range c.builder.streams {
series.Reset(
getUnsafeBytes(seriesID),
getUnsafeBytes(c.userID),
withoutTenantLabel(stream.labels),
)
logprotoChunkRef.Fingerprint = uint64(stream.fp)
chunkEntry.SeriesID = getUnsafeBytes(seriesID)
chunkEntry.Labels = withoutTenantLabel(stream.labels)
for i := 0; i < len(stream.chunks) && ctx.Err() == nil; i++ {
chk := stream.chunks[i]
@ -313,19 +312,19 @@ func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.Ch
logprotoChunkRef.Through = chk.Through()
logprotoChunkRef.Checksum = chk.Checksum
chunkEntry.ChunkID = getUnsafeBytes(schemaCfg.ExternalKey(logprotoChunkRef))
chunkEntry.From = logprotoChunkRef.From
chunkEntry.Through = logprotoChunkRef.Through
deleteChunk, err := callback(chunkEntry)
if err != nil {
return err
}
series.AppendChunks(retention.Chunk{
ChunkID: getUnsafeBytes(schemaCfg.ExternalKey(logprotoChunkRef)),
From: logprotoChunkRef.From,
Through: logprotoChunkRef.Through,
})
}
if ctx.Err() != nil {
return ctx.Err()
}
if deleteChunk {
// add the chunk to the list of chunks to delete which would be taken care of while building the index.
c.deleteChunks[seriesID] = append(c.deleteChunks[seriesID], chk)
}
err := callback(series)
if err != nil {
return err
}
}
@ -368,6 +367,22 @@ func (c *compactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error {
return nil
}
func (c *compactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error {
chk, err := chunk.ParseExternalKey(string(userID), string(chunkID))
if err != nil {
return err
}
seriesID := labels.String()
c.deleteChunks[seriesID] = append(c.deleteChunks[seriesID], tsdbindex.ChunkMeta{
Checksum: chk.Checksum,
MinTime: int64(from),
MaxTime: int64(through),
})
return nil
}
func (c *compactedIndex) Cleanup() {}
// ToIndexFile creates an indexFile from the chunksmetas stored in the builder.

@ -626,18 +626,13 @@ func TestCompactor_Compact(t *testing.T) {
}
}
func chunkMetasToChunkEntry(schemaCfg config.SchemaConfig, userID string, lbls labels.Labels, chunkMetas index.ChunkMetas) []retention.ChunkEntry {
chunkEntries := make([]retention.ChunkEntry, 0, len(chunkMetas))
func chunkMetasToRetentionChunk(schemaCfg config.SchemaConfig, userID string, lbls labels.Labels, chunkMetas index.ChunkMetas) []retention.Chunk {
chunkEntries := make([]retention.Chunk, 0, len(chunkMetas))
for _, chunkMeta := range chunkMetas {
chunkEntries = append(chunkEntries, retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
UserID: []byte(userID),
SeriesID: []byte(lbls.String()),
ChunkID: []byte(schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls))),
From: chunkMeta.From(),
Through: chunkMeta.Through(),
},
Labels: lbls,
chunkEntries = append(chunkEntries, retention.Chunk{
ChunkID: []byte(schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls))),
From: chunkMeta.From(),
Through: chunkMeta.Through(),
})
}
@ -658,35 +653,58 @@ func TestCompactedIndex(t *testing.T) {
testCtx := setupCompactedIndex(t)
for name, tc := range map[string]struct {
deleteChunks map[string]index.ChunkMetas
deleteChunks map[string][]retention.Chunk
addChunks []chunk.Chunk
deleteSeries []labels.Labels
shouldErr bool
finalExpectedChunks map[string]index.ChunkMetas
finalExpectedChunks map[string][]retention.Chunk
}{
"no changes": {
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10))),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20))),
},
},
"delete some chunks from a stream": {
deleteChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))...),
deleteChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
append(
buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)),
buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))...,
),
),
},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)), buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))...)...),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
append(
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)),
append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)),
buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))...,
)...,
),
),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"delete all chunks from a stream": {
deleteChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)),
deleteChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)),
),
},
deleteSeries: []labels.Labels{testCtx.lbls1},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"add some chunks to a stream": {
@ -702,9 +720,15 @@ func TestCompactedIndex(t *testing.T) {
Data: dummyChunkData{},
},
},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"__name__ label should get dropped while indexing chunks": {
@ -720,9 +744,15 @@ func TestCompactedIndex(t *testing.T) {
Data: dummyChunkData{},
},
},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"add some chunks out of table interval to a stream": {
@ -749,9 +779,15 @@ func TestCompactedIndex(t *testing.T) {
Data: dummyChunkData{},
},
},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"add and delete some chunks in a stream": {
@ -767,12 +803,24 @@ func TestCompactedIndex(t *testing.T) {
Data: dummyChunkData{},
},
},
deleteChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)),
deleteChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)),
),
},
finalExpectedChunks: map[string]index.ChunkMetas{
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))...),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
finalExpectedChunks: map[string][]retention.Chunk{
testCtx.lbls1.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls1,
append(
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)),
buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))...,
),
),
testCtx.lbls2.String(): chunkMetasToRetentionChunk(
testCtx.schemaCfg, testCtx.userID, testCtx.lbls2,
buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
),
},
},
"adding chunk to non-existing stream should error": {
@ -789,19 +837,17 @@ func TestCompactedIndex(t *testing.T) {
t.Run(name, func(t *testing.T) {
compactedIndex := testCtx.buildCompactedIndex()
foundChunkEntries := map[string][]retention.ChunkEntry{}
err := compactedIndex.ForEachChunk(context.Background(), func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) {
seriesIDStr := string(chunkEntry.SeriesID)
foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], chunkEntry)
if chks, ok := tc.deleteChunks[string(chunkEntry.SeriesID)]; ok {
foundChunkEntries := map[string][]retention.Chunk{}
err := compactedIndex.ForEachSeries(context.Background(), func(series retention.Series) error {
seriesIDStr := string(series.SeriesID())
foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], series.Chunks()...)
if chks, ok := tc.deleteChunks[string(series.SeriesID())]; ok {
for _, chk := range chks {
if chk.MinTime == int64(chunkEntry.From) && chk.MaxTime == int64(chunkEntry.Through) {
return true, nil
}
require.NoError(t, compactedIndex.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID))
}
}
return false, nil
return nil
})
require.NoError(t, err)
@ -823,9 +869,9 @@ func TestCompactedIndex(t *testing.T) {
}
require.NoError(t, err)
foundChunks := map[string]index.ChunkMetas{}
foundChunks := map[string][]retention.Chunk{}
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(context.Background(), "", nil, 0, math.MaxInt64, func(lbls labels.Labels, _ model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
foundChunks[lbls.String()] = append(index.ChunkMetas{}, chks...)
foundChunks[lbls.String()] = chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, lbls, chks)
return false
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
require.NoError(t, err)
@ -843,11 +889,8 @@ func TestIteratorContextCancelation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
var foundChunkEntries []retention.ChunkEntry
err := compactedIndex.ForEachChunk(ctx, func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) {
foundChunkEntries = append(foundChunkEntries, chunkEntry)
return false, nil
err := compactedIndex.ForEachSeries(ctx, func(_ retention.Series) error {
return nil
})
require.ErrorIs(t, err, context.Canceled)
@ -860,7 +903,8 @@ type testContext struct {
tableInterval model.Interval
shiftTableStart func(ms int64) int64
buildCompactedIndex func() *compactedIndex
expectedChunkEntries map[string][]retention.ChunkEntry
expectedChunkEntries map[string][]retention.Chunk
schemaCfg config.SchemaConfig
}
func setupCompactedIndex(t *testing.T) *testContext {
@ -903,12 +947,12 @@ func setupCompactedIndex(t *testing.T) *testContext {
return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
}
expectedChunkEntries := map[string][]retention.ChunkEntry{
lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))),
lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))),
expectedChunkEntries := map[string][]retention.Chunk{
lbls1.String(): chunkMetasToRetentionChunk(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))),
lbls2.String(): chunkMetasToRetentionChunk(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))),
}
return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries}
return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries, schemaCfg}
}
type dummyChunkData struct {

@ -151,7 +151,7 @@ func (c ChunkMetas) Drop(chk ChunkMeta) (ChunkMetas, bool) {
return ichk.Checksum >= chk.Checksum
})
if j >= len(c) || c[j] != chk {
if j >= len(c) || c[j].Checksum != chk.Checksum || c[j].MinTime != chk.MinTime || c[j].MaxTime != chk.MaxTime {
return c, false
}

@ -101,20 +101,22 @@ func ValidateCompactedIndex(ctx context.Context, objClient client.ObjectClient,
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(parallelism)
compactedIdx.ForEachChunk(ctx, func(ce retention.ChunkEntry) (deleteChunk bool, err error) { //nolint:errcheck
compactedIdx.ForEachSeries(ctx, func(s retention.Series) (err error) { //nolint:errcheck
bar.Add(1) // nolint:errcheck
g.Go(func() error {
exists, err := CheckChunkExistance(string(ce.ChunkID), objClient)
if err != nil || !exists {
missingChunks.Add(1)
logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(ce.ChunkID))
return nil
for _, c := range s.Chunks() {
exists, err := CheckChunkExistance(string(c.ChunkID), objClient)
if err != nil || !exists {
missingChunks.Add(1)
logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(c.ChunkID))
return nil
}
foundChunks.Add(1)
}
foundChunks.Add(1)
return nil
})
return false, nil
return nil
})
g.Wait() // nolint:errcheck

@ -44,28 +44,25 @@ func (t testObjClient) GetAttributes(_ context.Context, object string) (client.O
type testCompactedIdx struct {
compactor.CompactedIndex
chunks []retention.ChunkEntry
chunks []retention.Chunk
}
func (t testCompactedIdx) ForEachChunk(_ context.Context, f retention.ChunkEntryCallback) error {
for _, chunk := range t.chunks {
if _, err := f(chunk); err != nil {
return err
}
}
return nil
func (t testCompactedIdx) ForEachSeries(_ context.Context, f retention.SeriesCallback) error {
var series retention.Series
series.AppendChunks(t.chunks...)
return f(series)
}
func TestAuditIndex(t *testing.T) {
ctx := context.Background()
objClient := testObjClient{}
compactedIdx := testCompactedIdx{
chunks: []retention.ChunkEntry{
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-1")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-2")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-3")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("found-4")}},
{ChunkRef: retention.ChunkRef{ChunkID: []byte("missing-1")}},
chunks: []retention.Chunk{
{ChunkID: []byte("found-1")},
{ChunkID: []byte("found-2")},
{ChunkID: []byte("found-3")},
{ChunkID: []byte("found-4")},
{ChunkID: []byte("missing-1")},
},
}
logger := log.NewNopLogger()

@ -78,15 +78,19 @@ func main() {
// loads everything into memory.
if err := db.View(func(t *bbolt.Tx) error {
return boltdbcompactor.ForEachChunk(context.Background(), t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) {
builder.AddSeries(entry.Labels, model.Fingerprint(entry.Labels.Hash()), []index.ChunkMeta{{
Checksum: extractChecksumFromChunkID(entry.ChunkID),
MinTime: int64(entry.From),
MaxTime: int64(entry.Through),
KB: ((3 << 20) / 4) / 1024, // guess: 0.75mb, 1/2 of the max size, rounded to KB
Entries: 10000, // guess: 10k entries
}})
return false, nil
return boltdbcompactor.ForEachSeries(context.Background(), t.Bucket([]byte("index")), periodConfig, func(s retention.Series) error {
chunkMetas := make([]index.ChunkMeta, 0, len(s.Chunks()))
for _, chunk := range s.Chunks() {
chunkMetas = append(chunkMetas, index.ChunkMeta{
Checksum: extractChecksumFromChunkID(chunk.ChunkID),
MinTime: int64(chunk.From),
MaxTime: int64(chunk.Through),
KB: ((3 << 20) / 4) / 1024, // guess: 0.75mb, 1/2 of the max size, rounded to KB
Entries: 10000, // guess: 10k entries
})
}
builder.AddSeries(s.Labels(), model.Fingerprint(s.Labels().Hash()), chunkMetas)
return nil
})
}); err != nil {
panic(err)

Loading…
Cancel
Save