mirror of https://github.com/grafana/loki
parent
b079cc112e
commit
1f2fdff4a5
@ -0,0 +1,96 @@ |
||||
package deletion |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/sha256" |
||||
"encoding/hex" |
||||
"fmt" |
||||
"io" |
||||
"path" |
||||
"strings" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/compactor/deletion/deletionproto" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk/client" |
||||
) |
||||
|
||||
const ( |
||||
tombstonePrefix = "markers/dataobj-tombstones" |
||||
tombstoneSuffix = ".tomb" |
||||
) |
||||
|
||||
// WriteTombstone writes a tombstone marker to object storage.
|
||||
// Path: markers/dataobj-tombstones/<tenant>/<hash>.tomb
|
||||
func WriteTombstone(ctx context.Context, objectClient client.ObjectClient, tombstone *deletionproto.DataObjectTombstone) error { |
||||
data, err := proto.Marshal(tombstone) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to marshal tombstone: %w", err) |
||||
} |
||||
|
||||
key := buildTombstoneKey(tombstone.TenantID, tombstone.ObjectPath, tombstone.CreatedAt) |
||||
if err := objectClient.PutObject(ctx, key, strings.NewReader(unsafeGetString(data))); err != nil { |
||||
return fmt.Errorf("failed to write tombstone to %s: %w", key, err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// ReadTombstone reads a tombstone marker from object storage.
|
||||
func ReadTombstone(ctx context.Context, objectClient client.ObjectClient, key string) (*deletionproto.DataObjectTombstone, error) { |
||||
reader, _, err := objectClient.GetObject(ctx, key) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to get tombstone from %s: %w", key, err) |
||||
} |
||||
defer reader.Close() |
||||
|
||||
data, err := io.ReadAll(reader) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to read tombstone data: %w", err) |
||||
} |
||||
|
||||
var tombstone deletionproto.DataObjectTombstone |
||||
if err := proto.Unmarshal(data, &tombstone); err != nil { |
||||
return nil, fmt.Errorf("failed to unmarshal tombstone: %w", err) |
||||
} |
||||
|
||||
return &tombstone, nil |
||||
} |
||||
|
||||
// ListTombstones lists all tombstones for a tenant.
|
||||
// NOTE: If this becomes slow (>1000 tombstones/tenant), add window partitioning to the path.
|
||||
func ListTombstones(ctx context.Context, objectClient client.ObjectClient, tenantID string) ([]*deletionproto.DataObjectTombstone, error) { |
||||
prefix := path.Join(tombstonePrefix, tenantID) + "/" |
||||
|
||||
objects, _, err := objectClient.List(ctx, prefix, "") |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to list tombstones in %s: %w", prefix, err) |
||||
} |
||||
|
||||
tombstones := make([]*deletionproto.DataObjectTombstone, 0, len(objects)) |
||||
for _, obj := range objects { |
||||
if !strings.HasSuffix(obj.Key, tombstoneSuffix) { |
||||
continue |
||||
} |
||||
|
||||
tombstone, err := ReadTombstone(ctx, objectClient, obj.Key) |
||||
if err != nil { |
||||
// Log and continue on individual failures
|
||||
continue |
||||
} |
||||
tombstones = append(tombstones, tombstone) |
||||
} |
||||
|
||||
return tombstones, nil |
||||
} |
||||
|
||||
// buildTombstoneKey creates the storage key for a tombstone.
|
||||
// Format: markers/dataobj-tombstones/<tenant>/<hash>.tomb
|
||||
func buildTombstoneKey(tenantID, objectPath string, createdAt model.Time) string { |
||||
// Use object path + timestamp for hash to ensure uniqueness
|
||||
hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%d", objectPath, createdAt))) |
||||
hashStr := hex.EncodeToString(hash[:8]) // Use first 8 bytes for shorter names
|
||||
|
||||
return path.Join(tombstonePrefix, tenantID, hashStr+tombstoneSuffix) |
||||
} |
||||
@ -0,0 +1,188 @@ |
||||
package deletion |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/compactor/deletion/deletionproto" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local" |
||||
) |
||||
|
||||
func TestWriteReadTombstone(t *testing.T) { |
||||
ctx := context.Background() |
||||
tempDir := t.TempDir() |
||||
|
||||
objectClient, err := local.NewFSObjectClient(local.FSConfig{ |
||||
Directory: tempDir, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
tombstone *deletionproto.DataObjectTombstone |
||||
}{ |
||||
{ |
||||
name: "single section", |
||||
tombstone: &deletionproto.DataObjectTombstone{ |
||||
ObjectPath: "data/tenant-1/12345.dataobj", |
||||
DeletedSectionIndices: []uint32{1}, |
||||
DeleteRequestID: "req-123", |
||||
CreatedAt: model.Now(), |
||||
TenantID: "tenant-1", |
||||
}, |
||||
}, |
||||
{ |
||||
name: "multiple sections", |
||||
tombstone: &deletionproto.DataObjectTombstone{ |
||||
ObjectPath: "data/tenant-1/12346.dataobj", |
||||
DeletedSectionIndices: []uint32{1, 2, 3, 5, 8}, |
||||
DeleteRequestID: "req-456", |
||||
CreatedAt: model.Now(), |
||||
TenantID: "tenant-1", |
||||
}, |
||||
}, |
||||
{ |
||||
name: "different tenant", |
||||
tombstone: &deletionproto.DataObjectTombstone{ |
||||
ObjectPath: "data/tenant-2/99999.dataobj", |
||||
DeletedSectionIndices: []uint32{0, 1}, |
||||
DeleteRequestID: "req-789", |
||||
CreatedAt: model.Now(), |
||||
TenantID: "tenant-2", |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
// Write tombstone
|
||||
err := WriteTombstone(ctx, objectClient, tt.tombstone) |
||||
require.NoError(t, err) |
||||
|
||||
// Build the expected key
|
||||
key := buildTombstoneKey(tt.tombstone.TenantID, tt.tombstone.ObjectPath, tt.tombstone.CreatedAt) |
||||
|
||||
// Read it back
|
||||
readTombstone, err := ReadTombstone(ctx, objectClient, key) |
||||
require.NoError(t, err) |
||||
|
||||
// Verify fields match
|
||||
require.Equal(t, tt.tombstone.ObjectPath, readTombstone.ObjectPath) |
||||
require.Equal(t, tt.tombstone.DeletedSectionIndices, readTombstone.DeletedSectionIndices) |
||||
require.Equal(t, tt.tombstone.DeleteRequestID, readTombstone.DeleteRequestID) |
||||
require.Equal(t, tt.tombstone.CreatedAt, readTombstone.CreatedAt) |
||||
require.Equal(t, tt.tombstone.TenantID, readTombstone.TenantID) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestListTombstones(t *testing.T) { |
||||
ctx := context.Background() |
||||
tempDir := t.TempDir() |
||||
|
||||
objectClient, err := local.NewFSObjectClient(local.FSConfig{ |
||||
Directory: tempDir, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
// Create tombstones for different tenants
|
||||
createdAt := model.Now() |
||||
|
||||
tombstones := []*deletionproto.DataObjectTombstone{ |
||||
{ |
||||
ObjectPath: "data/tenant-1/12345.dataobj", |
||||
DeletedSectionIndices: []uint32{1, 2}, |
||||
DeleteRequestID: "req-1", |
||||
CreatedAt: createdAt, |
||||
TenantID: "tenant-1", |
||||
}, |
||||
{ |
||||
ObjectPath: "data/tenant-1/12346.dataobj", |
||||
DeletedSectionIndices: []uint32{3, 4}, |
||||
DeleteRequestID: "req-2", |
||||
CreatedAt: createdAt, |
||||
TenantID: "tenant-1", |
||||
}, |
||||
{ |
||||
ObjectPath: "data/tenant-2/99999.dataobj", |
||||
DeletedSectionIndices: []uint32{0}, |
||||
DeleteRequestID: "req-3", |
||||
CreatedAt: createdAt, |
||||
TenantID: "tenant-2", |
||||
}, |
||||
} |
||||
|
||||
// Write all tombstones
|
||||
for _, tomb := range tombstones { |
||||
err := WriteTombstone(ctx, objectClient, tomb) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
// List tombstones for tenant-1
|
||||
listed, err := ListTombstones(ctx, objectClient, "tenant-1") |
||||
require.NoError(t, err) |
||||
require.Len(t, listed, 2) |
||||
|
||||
// Verify we got the right tombstones
|
||||
objectPaths := make(map[string]bool) |
||||
for _, tomb := range listed { |
||||
require.Equal(t, "tenant-1", tomb.TenantID) |
||||
objectPaths[tomb.ObjectPath] = true |
||||
} |
||||
require.True(t, objectPaths["data/tenant-1/12345.dataobj"]) |
||||
require.True(t, objectPaths["data/tenant-1/12346.dataobj"]) |
||||
|
||||
// List tombstones for tenant-2
|
||||
listed, err = ListTombstones(ctx, objectClient, "tenant-2") |
||||
require.NoError(t, err) |
||||
require.Len(t, listed, 1) |
||||
require.Equal(t, "tenant-2", listed[0].TenantID) |
||||
require.Equal(t, "data/tenant-2/99999.dataobj", listed[0].ObjectPath) |
||||
|
||||
// List tombstones for non-existent tenant
|
||||
listed, err = ListTombstones(ctx, objectClient, "tenant-3") |
||||
require.NoError(t, err) |
||||
require.Len(t, listed, 0) |
||||
} |
||||
|
||||
func TestBuildTombstoneKey(t *testing.T) { |
||||
tenantID := "tenant-1" |
||||
objectPath := "data/tenant-1/12345.dataobj" |
||||
createdAt := model.Time(1705329600000) // 2024-01-15 12:00:00 UTC
|
||||
|
||||
key1 := buildTombstoneKey(tenantID, objectPath, createdAt) |
||||
key2 := buildTombstoneKey(tenantID, objectPath, createdAt) |
||||
|
||||
// Same inputs should produce same key
|
||||
require.Equal(t, key1, key2) |
||||
|
||||
// Key should have expected format
|
||||
require.Contains(t, key1, "markers/dataobj-tombstones") |
||||
require.Contains(t, key1, tenantID) |
||||
require.Contains(t, key1, ".tomb") |
||||
|
||||
// Different timestamp should produce different key
|
||||
key3 := buildTombstoneKey(tenantID, objectPath, createdAt+1) |
||||
require.NotEqual(t, key1, key3) |
||||
|
||||
// Different object path should produce different key
|
||||
key4 := buildTombstoneKey(tenantID, "data/tenant-1/99999.dataobj", createdAt) |
||||
require.NotEqual(t, key1, key4) |
||||
} |
||||
|
||||
func TestReadTombstoneNotFound(t *testing.T) { |
||||
ctx := context.Background() |
||||
tempDir := t.TempDir() |
||||
|
||||
objectClient, err := local.NewFSObjectClient(local.FSConfig{ |
||||
Directory: tempDir, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
// Try to read non-existent tombstone
|
||||
_, err = ReadTombstone(ctx, objectClient, "markers/dataobj-tombstones/tenant-1/20240115/nonexistent.tomb") |
||||
require.Error(t, err) |
||||
} |
||||
Loading…
Reference in new issue