From 1f2fdff4a5dc3161a86ea31bcabf840c4facf292 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Thu, 15 Jan 2026 17:07:11 +0530 Subject: [PATCH] Create retrieval and storage methods for data tombstones --- pkg/compactor/deletion/dataobj_tombstone.go | 96 +++++++++ .../deletion/dataobj_tombstone_test.go | 188 ++++++++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 pkg/compactor/deletion/dataobj_tombstone.go create mode 100644 pkg/compactor/deletion/dataobj_tombstone_test.go diff --git a/pkg/compactor/deletion/dataobj_tombstone.go b/pkg/compactor/deletion/dataobj_tombstone.go new file mode 100644 index 0000000000..8ef1eb1ade --- /dev/null +++ b/pkg/compactor/deletion/dataobj_tombstone.go @@ -0,0 +1,96 @@ +package deletion + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "path" + "strings" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/v3/pkg/compactor/deletion/deletionproto" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" +) + +const ( + tombstonePrefix = "markers/dataobj-tombstones" + tombstoneSuffix = ".tomb" +) + +// WriteTombstone writes a tombstone marker to object storage. +// Path: markers/dataobj-tombstones//.tomb +func WriteTombstone(ctx context.Context, objectClient client.ObjectClient, tombstone *deletionproto.DataObjectTombstone) error { + data, err := proto.Marshal(tombstone) + if err != nil { + return fmt.Errorf("failed to marshal tombstone: %w", err) + } + + key := buildTombstoneKey(tombstone.TenantID, tombstone.ObjectPath, tombstone.CreatedAt) + if err := objectClient.PutObject(ctx, key, strings.NewReader(unsafeGetString(data))); err != nil { + return fmt.Errorf("failed to write tombstone to %s: %w", key, err) + } + + return nil +} + +// ReadTombstone reads a tombstone marker from object storage. +func ReadTombstone(ctx context.Context, objectClient client.ObjectClient, key string) (*deletionproto.DataObjectTombstone, error) { + reader, _, err := objectClient.GetObject(ctx, key) + if err != nil { + return nil, fmt.Errorf("failed to get tombstone from %s: %w", key, err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read tombstone data: %w", err) + } + + var tombstone deletionproto.DataObjectTombstone + if err := proto.Unmarshal(data, &tombstone); err != nil { + return nil, fmt.Errorf("failed to unmarshal tombstone: %w", err) + } + + return &tombstone, nil +} + +// ListTombstones lists all tombstones for a tenant. +// NOTE: If this becomes slow (>1000 tombstones/tenant), add window partitioning to the path. +func ListTombstones(ctx context.Context, objectClient client.ObjectClient, tenantID string) ([]*deletionproto.DataObjectTombstone, error) { + prefix := path.Join(tombstonePrefix, tenantID) + "/" + + objects, _, err := objectClient.List(ctx, prefix, "") + if err != nil { + return nil, fmt.Errorf("failed to list tombstones in %s: %w", prefix, err) + } + + tombstones := make([]*deletionproto.DataObjectTombstone, 0, len(objects)) + for _, obj := range objects { + if !strings.HasSuffix(obj.Key, tombstoneSuffix) { + continue + } + + tombstone, err := ReadTombstone(ctx, objectClient, obj.Key) + if err != nil { + // Log and continue on individual failures + continue + } + tombstones = append(tombstones, tombstone) + } + + return tombstones, nil +} + +// buildTombstoneKey creates the storage key for a tombstone. +// Format: markers/dataobj-tombstones//.tomb +func buildTombstoneKey(tenantID, objectPath string, createdAt model.Time) string { + // Use object path + timestamp for hash to ensure uniqueness + hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%d", objectPath, createdAt))) + hashStr := hex.EncodeToString(hash[:8]) // Use first 8 bytes for shorter names + + return path.Join(tombstonePrefix, tenantID, hashStr+tombstoneSuffix) +} diff --git a/pkg/compactor/deletion/dataobj_tombstone_test.go b/pkg/compactor/deletion/dataobj_tombstone_test.go new file mode 100644 index 0000000000..df980a1bbf --- /dev/null +++ b/pkg/compactor/deletion/dataobj_tombstone_test.go @@ -0,0 +1,188 @@ +package deletion + +import ( + "context" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/compactor/deletion/deletionproto" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" +) + +func TestWriteReadTombstone(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: tempDir, + }) + require.NoError(t, err) + + tests := []struct { + name string + tombstone *deletionproto.DataObjectTombstone + }{ + { + name: "single section", + tombstone: &deletionproto.DataObjectTombstone{ + ObjectPath: "data/tenant-1/12345.dataobj", + DeletedSectionIndices: []uint32{1}, + DeleteRequestID: "req-123", + CreatedAt: model.Now(), + TenantID: "tenant-1", + }, + }, + { + name: "multiple sections", + tombstone: &deletionproto.DataObjectTombstone{ + ObjectPath: "data/tenant-1/12346.dataobj", + DeletedSectionIndices: []uint32{1, 2, 3, 5, 8}, + DeleteRequestID: "req-456", + CreatedAt: model.Now(), + TenantID: "tenant-1", + }, + }, + { + name: "different tenant", + tombstone: &deletionproto.DataObjectTombstone{ + ObjectPath: "data/tenant-2/99999.dataobj", + DeletedSectionIndices: []uint32{0, 1}, + DeleteRequestID: "req-789", + CreatedAt: model.Now(), + TenantID: "tenant-2", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Write tombstone + err := WriteTombstone(ctx, objectClient, tt.tombstone) + require.NoError(t, err) + + // Build the expected key + key := buildTombstoneKey(tt.tombstone.TenantID, tt.tombstone.ObjectPath, tt.tombstone.CreatedAt) + + // Read it back + readTombstone, err := ReadTombstone(ctx, objectClient, key) + require.NoError(t, err) + + // Verify fields match + require.Equal(t, tt.tombstone.ObjectPath, readTombstone.ObjectPath) + require.Equal(t, tt.tombstone.DeletedSectionIndices, readTombstone.DeletedSectionIndices) + require.Equal(t, tt.tombstone.DeleteRequestID, readTombstone.DeleteRequestID) + require.Equal(t, tt.tombstone.CreatedAt, readTombstone.CreatedAt) + require.Equal(t, tt.tombstone.TenantID, readTombstone.TenantID) + }) + } +} + +func TestListTombstones(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: tempDir, + }) + require.NoError(t, err) + + // Create tombstones for different tenants + createdAt := model.Now() + + tombstones := []*deletionproto.DataObjectTombstone{ + { + ObjectPath: "data/tenant-1/12345.dataobj", + DeletedSectionIndices: []uint32{1, 2}, + DeleteRequestID: "req-1", + CreatedAt: createdAt, + TenantID: "tenant-1", + }, + { + ObjectPath: "data/tenant-1/12346.dataobj", + DeletedSectionIndices: []uint32{3, 4}, + DeleteRequestID: "req-2", + CreatedAt: createdAt, + TenantID: "tenant-1", + }, + { + ObjectPath: "data/tenant-2/99999.dataobj", + DeletedSectionIndices: []uint32{0}, + DeleteRequestID: "req-3", + CreatedAt: createdAt, + TenantID: "tenant-2", + }, + } + + // Write all tombstones + for _, tomb := range tombstones { + err := WriteTombstone(ctx, objectClient, tomb) + require.NoError(t, err) + } + + // List tombstones for tenant-1 + listed, err := ListTombstones(ctx, objectClient, "tenant-1") + require.NoError(t, err) + require.Len(t, listed, 2) + + // Verify we got the right tombstones + objectPaths := make(map[string]bool) + for _, tomb := range listed { + require.Equal(t, "tenant-1", tomb.TenantID) + objectPaths[tomb.ObjectPath] = true + } + require.True(t, objectPaths["data/tenant-1/12345.dataobj"]) + require.True(t, objectPaths["data/tenant-1/12346.dataobj"]) + + // List tombstones for tenant-2 + listed, err = ListTombstones(ctx, objectClient, "tenant-2") + require.NoError(t, err) + require.Len(t, listed, 1) + require.Equal(t, "tenant-2", listed[0].TenantID) + require.Equal(t, "data/tenant-2/99999.dataobj", listed[0].ObjectPath) + + // List tombstones for non-existent tenant + listed, err = ListTombstones(ctx, objectClient, "tenant-3") + require.NoError(t, err) + require.Len(t, listed, 0) +} + +func TestBuildTombstoneKey(t *testing.T) { + tenantID := "tenant-1" + objectPath := "data/tenant-1/12345.dataobj" + createdAt := model.Time(1705329600000) // 2024-01-15 12:00:00 UTC + + key1 := buildTombstoneKey(tenantID, objectPath, createdAt) + key2 := buildTombstoneKey(tenantID, objectPath, createdAt) + + // Same inputs should produce same key + require.Equal(t, key1, key2) + + // Key should have expected format + require.Contains(t, key1, "markers/dataobj-tombstones") + require.Contains(t, key1, tenantID) + require.Contains(t, key1, ".tomb") + + // Different timestamp should produce different key + key3 := buildTombstoneKey(tenantID, objectPath, createdAt+1) + require.NotEqual(t, key1, key3) + + // Different object path should produce different key + key4 := buildTombstoneKey(tenantID, "data/tenant-1/99999.dataobj", createdAt) + require.NotEqual(t, key1, key4) +} + +func TestReadTombstoneNotFound(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: tempDir, + }) + require.NoError(t, err) + + // Try to read non-existent tombstone + _, err = ReadTombstone(ctx, objectClient, "markers/dataobj-tombstones/tenant-1/20240115/nonexistent.tomb") + require.Error(t, err) +}