De-duplicate common prefixes as returned for individual buckets (#11317)

With multiple buckets configured on a single AWS/S3 store, each object
is stored on one of the buckets by hashing its object key in
`bucketFromKey()`. That is sufficient for `GetObject()` and
`PutObject()`.

`List()` needs to list each bucket and combine the results. It appends
all object keys into a `storageObjects` slice. That works because each
key uniquely exists in only one of the buckets.

But that is not the case for the common prefixes; a common prefix may
exist in multiple buckets. This PR removes duplicates from the common
prefixes as gathered from the multiple buckets in `List()`.

Without this fix, we find that a repeated common prefix leads to a table
being compacted multiple times concurrently which is not safe. E.g. this
error results when the compactor concurrently tries to compact a table
and a 'losing' execution finds that its clean-up has already been done:

`level=error caller=compactor.go:128 table-name=loki_index_tsdb_19683
msg="failed to remove downloaded index file"
path=/var/loki/compactor/loki_index_tsdb_19683/1700667008-loki-write-0-1700660468255353690.tsdb
err="remove
/var/loki/compactor/loki_index_tsdb_19683/1700667008-loki-write-0-1700660468255353690.tsdb:
no such file or directory"
`
pull/11211/head
akevdmeer 2 years ago committed by GitHub
parent 856b57336c
commit 18778cd548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/storage/chunk/client/aws/s3_storage_client.go
  2. 25
      pkg/storage/chunk/client/aws/s3_storage_client_test.go

@ -405,6 +405,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object
func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var commonPrefixesSet = make(map[string]bool)
for i := range a.bucketNames {
err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
@ -428,7 +429,10 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]
}
for _, commonPrefix := range output.CommonPrefixes {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(aws.StringValue(commonPrefix.Prefix)))
if !commonPrefixesSet[aws.StringValue(commonPrefix.Prefix)] {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(aws.StringValue(commonPrefix.Prefix)))
commonPrefixesSet[aws.StringValue(commonPrefix.Prefix)] = true
}
}
if output.IsTruncated == nil || !*output.IsTruncated {

@ -21,6 +21,11 @@ import (
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
@ -195,3 +200,23 @@ session_token: session token
require.Equal(t, underTest.SessionToken.String(), "session token")
}
type testCommonPrefixesS3Client struct {
s3iface.S3API
}
func (m *testCommonPrefixesS3Client) ListObjectsV2WithContext(aws.Context, *s3.ListObjectsV2Input, ...request.Option) (*s3.ListObjectsV2Output, error) {
var commonPrefixes []*s3.CommonPrefix
commonPrefix := "common-prefix-repeated/"
for i := 0; i < 2; i++ {
commonPrefixes = append(commonPrefixes, &s3.CommonPrefix{Prefix: aws.String(commonPrefix)})
}
return &s3.ListObjectsV2Output{CommonPrefixes: commonPrefixes, IsTruncated: aws.Bool(false)}, nil
}
func TestCommonPrefixes(t *testing.T) {
s3 := S3ObjectClient{S3: &testCommonPrefixesS3Client{}, bucketNames: []string{"bucket"}}
_, CommonPrefixes, err := s3.List(context.Background(), "", "/")
require.Equal(t, nil, err)
require.Equal(t, 1, len(CommonPrefixes))
}

Loading…
Cancel
Save