From 18778cd54873c153fada1f209fa2fefbef57ac4f Mon Sep 17 00:00:00 2001 From: akevdmeer <57189107+akevdmeer@users.noreply.github.com> Date: Mon, 4 Dec 2023 12:20:10 +0100 Subject: [PATCH] De-duplicate common prefixes as returned for individual buckets (#11317) With multiple buckets configured on a single AWS/S3 store, each object is stored on one of the buckets by hashing its object key in `bucketFromKey()`. That is sufficient for `GetObject()` and `PutObject()`. `List()` needs to list each bucket and combine the results. It appends all object keys into a `storageObjects` slice. That works because each key uniquely exists in only one of the buckets. But that is not the case for the common prefixes; a common prefix may exist in multiple buckets. This PR removes duplicates from the common prefixes as gathered from the multiple buckets in `List()`. Without this fix, we find that a repeated common prefix leads to a table being compacted multiple times concurrently which is not safe. E.g. this error results when the compactor concurrently tries to compact a table and a 'losing' execution finds that its clean-up has already been done: `level=error caller=compactor.go:128 table-name=loki_index_tsdb_19683 msg="failed to remove downloaded index file" path=/var/loki/compactor/loki_index_tsdb_19683/1700667008-loki-write-0-1700660468255353690.tsdb err="remove /var/loki/compactor/loki_index_tsdb_19683/1700667008-loki-write-0-1700660468255353690.tsdb: no such file or directory" ` --- .../chunk/client/aws/s3_storage_client.go | 6 ++++- .../client/aws/s3_storage_client_test.go | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index d21513f115..0c2136801f 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -405,6 +405,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var storageObjects []client.StorageObject var commonPrefixes []client.StorageCommonPrefix + var commonPrefixesSet = make(map[string]bool) for i := range a.bucketNames { err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { @@ -428,7 +429,10 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([] } for _, commonPrefix := range output.CommonPrefixes { - commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(aws.StringValue(commonPrefix.Prefix))) + if !commonPrefixesSet[aws.StringValue(commonPrefix.Prefix)] { + commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(aws.StringValue(commonPrefix.Prefix))) + commonPrefixesSet[aws.StringValue(commonPrefix.Prefix)] = true + } } if output.IsTruncated == nil || !*output.IsTruncated { diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index 00ec9eba40..769f8cf006 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -21,6 +21,11 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/pkg/storage/chunk/client/hedging" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" ) type RoundTripperFunc func(*http.Request) (*http.Response, error) @@ -195,3 +200,23 @@ session_token: session token require.Equal(t, underTest.SessionToken.String(), "session token") } + +type testCommonPrefixesS3Client struct { + s3iface.S3API +} + +func (m *testCommonPrefixesS3Client) ListObjectsV2WithContext(aws.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) { + var commonPrefixes []*s3.CommonPrefix + commonPrefix := "common-prefix-repeated/" + for i := 0; i < 2; i++ { + commonPrefixes = append(commonPrefixes, &s3.CommonPrefix{Prefix: aws.String(commonPrefix)}) + } + return &s3.ListObjectsV2Output{CommonPrefixes: commonPrefixes, IsTruncated: aws.Bool(false)}, nil +} + +func TestCommonPrefixes(t *testing.T) { + s3 := S3ObjectClient{S3: &testCommonPrefixesS3Client{}, bucketNames: []string{"bucket"}} + _, CommonPrefixes, err := s3.List(context.Background(), "", "/") + require.Equal(t, nil, err) + require.Equal(t, 1, len(CommonPrefixes)) +}