diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 19f9b20055..2541b77c02 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/tenant" lru "github.com/hashicorp/golang-lru" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -19,14 +20,12 @@ import ( "github.com/weaveworks/common/user" "go.uber.org/atomic" - "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/runtime" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" diff --git a/pkg/loki/delete_store_listener.go b/pkg/loki/delete_store_listener.go index 99e67c0893..441e1c3808 100644 --- a/pkg/loki/delete_store_listener.go +++ b/pkg/loki/delete_store_listener.go @@ -3,7 +3,7 @@ package loki import ( "github.com/grafana/dskit/services" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" ) func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 5c5656e046..2aca337304 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -9,8 +9,6 @@ import ( "os" rt "runtime" - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/fatih/color" "github.com/felixge/fgprof" "github.com/go-kit/log/level" @@ -43,10 +41,11 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/scheduler" "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/tracing" "github.com/grafana/loki/pkg/usagestats" diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 8051b0ffc2..ea891f2a03 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -13,10 +13,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" - "github.com/grafana/loki/pkg/logqlmodel/stats" - - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -39,6 +35,7 @@ import ( "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/lokifrontend/frontend" "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" "github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb" @@ -55,10 +52,11 @@ import ( chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/generationnumber" shipper_index "github.com/grafana/loki/pkg/storage/stores/shipper/index" boltdb_shipper_compactor "github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 472b620d5b..8cafa47eb8 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,11 +6,10 @@ import ( "net/http" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/go-kit/log/level" "github.com/grafana/dskit/tenant" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" "golang.org/x/sync/errgroup" @@ -23,7 +22,7 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/stores/index/stats" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" listutil "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/spanlogger" util_validation "github.com/grafana/loki/pkg/util/validation" diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 6ad3c6a6e9..8323137249 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" - "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" @@ -24,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/validation" ) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 3a2dae0771..ca666fbd5c 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -278,12 +278,16 @@ func NewTableClient(name string, cfg Config, cm ClientMetrics, registerer promet return local.NewTableClient(cfg.BoltDBConfig.Directory) case config.StorageTypeGrpc: return grpc.NewTableClient(cfg.GrpcConfig) - case config.BoltDBShipperType: + case config.BoltDBShipperType, config.TSDBType: objectClient, err := NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg, cm) if err != nil { return nil, err } - return shipper.NewBoltDBShipperTableClient(objectClient, cfg.BoltDBShipperConfig.SharedStoreKeyPrefix), nil + sharedStoreKeyPrefix := cfg.BoltDBShipperConfig.SharedStoreKeyPrefix + if name == config.TSDBType { + sharedStoreKeyPrefix = cfg.TSDBShipperConfig.SharedStoreKeyPrefix + } + return indexshipper.NewTableClient(objectClient, sharedStoreKeyPrefix), nil default: return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v, %v", name, config.StorageTypeAWS, config.StorageTypeCassandra, config.StorageTypeInMemory, config.StorageTypeGCP, config.StorageTypeBigTable, config.StorageTypeBigTableHashed, config.StorageTypeGrpc) } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/compactor.go rename to pkg/storage/stores/indexshipper/compactor/compactor.go index 715067dc05..0259141f2b 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -23,10 +23,9 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" + shipper_storage "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -114,7 +113,7 @@ func (cfg *Config) Validate() error { return err } - return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) + return shipper_storage.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) } type Compactor struct { diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/indexshipper/compactor/compactor_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/compactor_test.go rename to pkg/storage/stores/indexshipper/compactor/compactor_test.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/deletion/delete_request.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go index d6640ee4ea..ab718f68b8 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go @@ -7,7 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go index 21c857a0ea..b7eda96982 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) func TestDeleteRequest_IsDeleted(t *testing.T) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go index f88e96d8ca..f7a29409c3 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go @@ -9,7 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index 36e9e67a53..0d4f410458 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) const testUserID = "test-user" diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go index bf8aa1407b..6ec62bf794 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go @@ -16,8 +16,8 @@ import ( "github.com/prometheus/common/model" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" ) type ( diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go index ffd176ee43..f089ecc78a 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go @@ -8,12 +8,11 @@ import ( "testing" "time" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" ) func TestDeleteRequestsStore(t *testing.T) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go similarity index 95% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go index f61ed282a4..efe7f695c1 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go @@ -15,8 +15,8 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -65,8 +65,8 @@ func (t *deleteRequestsTable) init() error { _, err := os.Stat(t.dbPath) if err != nil { - err = shipper_util.DownloadFileFromStorage(t.dbPath, true, - true, shipper_util.LoggerWithFilename(util_log.Logger, deleteRequestsIndexFileName), func() (io.ReadCloser, error) { + err = storage.DownloadFileFromStorage(t.dbPath, true, + true, storage.LoggerWithFilename(util_log.Logger, deleteRequestsIndexFileName), func() (io.ReadCloser, error) { return t.indexStorageClient.GetFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName) }) if err != nil && !t.indexStorageClient.IsFileNotFoundErr(err) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table_test.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table_test.go index b617ab6ae3..46a4834f59 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/metrics.go b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/metrics.go rename to pkg/storage/stores/indexshipper/compactor/deletion/metrics.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode.go b/pkg/storage/stores/indexshipper/compactor/deletion/mode.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/mode.go rename to pkg/storage/stores/indexshipper/compactor/deletion/mode.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/mode_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/mode_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/mode_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/mode_test.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/noop_delete_requests_store.go b/pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/noop_delete_requests_store.go rename to pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/deletion/request_handler.go rename to pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go index acead9175b..c4825e68eb 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/request_handler.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go index aaecb44428..85d417c74c 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go @@ -12,7 +12,7 @@ import ( "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/storage/chunk/client/local" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/validation" ) diff --git a/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go similarity index 91% rename from pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go rename to pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go index f5b4817d6c..57985a5dbd 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go @@ -3,7 +3,7 @@ package deletion import ( "context" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) type perTenantDeleteRequestsClient struct { diff --git a/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation.go b/pkg/storage/stores/indexshipper/compactor/deletion/validation.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/validation.go rename to pkg/storage/stores/indexshipper/compactor/deletion/validation.go diff --git a/pkg/storage/stores/shipper/compactor/deletion/validation_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/validation_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/deletion/validation_test.go rename to pkg/storage/stores/indexshipper/compactor/deletion/validation_test.go diff --git a/pkg/storage/stores/shipper/compactor/generationnumber/gennumber_client.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/generationnumber/gennumber_client.go rename to pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go diff --git a/pkg/storage/stores/shipper/compactor/generationnumber/gennumber_client_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/generationnumber/gennumber_client_test.go rename to pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go diff --git a/pkg/storage/stores/shipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/generationnumber/gennumber_loader.go rename to pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go diff --git a/pkg/storage/stores/shipper/compactor/generationnumber/gennumber_loader_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/generationnumber/gennumber_loader_test.go rename to pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go diff --git a/pkg/storage/stores/shipper/compactor/generationnumber/metrics.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/metrics.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/generationnumber/metrics.go rename to pkg/storage/stores/indexshipper/compactor/generationnumber/metrics.go diff --git a/pkg/storage/stores/shipper/compactor/index_set.go b/pkg/storage/stores/indexshipper/compactor/index_set.go similarity index 95% rename from pkg/storage/stores/shipper/compactor/index_set.go rename to pkg/storage/stores/indexshipper/compactor/index_set.go index 057b245d9a..99b3ed78d8 100644 --- a/pkg/storage/stores/shipper/compactor/index_set.go +++ b/pkg/storage/stores/indexshipper/compactor/index_set.go @@ -14,10 +14,9 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk/client/util" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -120,14 +119,14 @@ func (is *indexSet) ListSourceFiles() []storage.IndexFile { } func (is *indexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) { - decompress := shipper_util.IsCompressedFile(indexFile.Name) + decompress := storage.IsCompressedFile(indexFile.Name) dst := filepath.Join(is.workingDir, indexFile.Name) if decompress { dst = strings.Trim(dst, gzipExtension) } - err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name), - false, shipper_util.LoggerWithFilename(is.logger, indexFile.Name), + err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name), + false, storage.LoggerWithFilename(is.logger, indexFile.Name), func() (io.ReadCloser, error) { return is.baseIndexSet.GetFile(is.ctx, is.tableName, is.userID, indexFile.Name) }) diff --git a/pkg/storage/stores/shipper/compactor/metrics.go b/pkg/storage/stores/indexshipper/compactor/metrics.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/metrics.go rename to pkg/storage/stores/indexshipper/compactor/metrics.go diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/indexshipper/compactor/retention/expiration.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/expiration.go rename to pkg/storage/stores/indexshipper/compactor/retention/expiration.go diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/indexshipper/compactor/retention/expiration_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/expiration_test.go rename to pkg/storage/stores/indexshipper/compactor/retention/expiration_test.go diff --git a/pkg/storage/stores/shipper/compactor/retention/marker.go b/pkg/storage/stores/indexshipper/compactor/retention/marker.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/marker.go rename to pkg/storage/stores/indexshipper/compactor/retention/marker.go diff --git a/pkg/storage/stores/shipper/compactor/retention/marker_test.go b/pkg/storage/stores/indexshipper/compactor/retention/marker_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/marker_test.go rename to pkg/storage/stores/indexshipper/compactor/retention/marker_test.go diff --git a/pkg/storage/stores/shipper/compactor/retention/metrics.go b/pkg/storage/stores/indexshipper/compactor/retention/metrics.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/metrics.go rename to pkg/storage/stores/indexshipper/compactor/retention/metrics.go diff --git a/pkg/storage/stores/shipper/compactor/retention/pool.go b/pkg/storage/stores/indexshipper/compactor/retention/pool.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/pool.go rename to pkg/storage/stores/indexshipper/compactor/retention/pool.go diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/indexshipper/compactor/retention/retention.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/retention.go rename to pkg/storage/stores/indexshipper/compactor/retention/retention.go diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/retention_test.go rename to pkg/storage/stores/indexshipper/compactor/retention/retention_test.go diff --git a/pkg/storage/stores/shipper/compactor/retention/series.go b/pkg/storage/stores/indexshipper/compactor/retention/series.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/series.go rename to pkg/storage/stores/indexshipper/compactor/retention/series.go diff --git a/pkg/storage/stores/shipper/compactor/retention/series_test.go b/pkg/storage/stores/indexshipper/compactor/retention/series_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/series_test.go rename to pkg/storage/stores/indexshipper/compactor/retention/series_test.go diff --git a/pkg/storage/stores/shipper/compactor/retention/util.go b/pkg/storage/stores/indexshipper/compactor/retention/util.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/util.go rename to pkg/storage/stores/indexshipper/compactor/retention/util.go diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/indexshipper/compactor/retention/util_test.go similarity index 100% rename from pkg/storage/stores/shipper/compactor/retention/util_test.go rename to pkg/storage/stores/indexshipper/compactor/retention/util_test.go diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/indexshipper/compactor/table.go similarity index 98% rename from pkg/storage/stores/shipper/compactor/table.go rename to pkg/storage/stores/indexshipper/compactor/table.go index 3d0687bf20..1364cd9945 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/indexshipper/compactor/table.go @@ -14,8 +14,8 @@ import ( chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/indexshipper/compactor/table_test.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/table_test.go rename to pkg/storage/stores/indexshipper/compactor/table_test.go index aa4ccfeccf..863ec9df25 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/indexshipper/compactor/table_test.go @@ -15,8 +15,8 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" ) const ( diff --git a/pkg/storage/stores/shipper/compactor/testutil.go b/pkg/storage/stores/indexshipper/compactor/testutil.go similarity index 99% rename from pkg/storage/stores/shipper/compactor/testutil.go rename to pkg/storage/stores/indexshipper/compactor/testutil.go index dea4a32fcd..487aecc2bf 100644 --- a/pkg/storage/stores/shipper/compactor/testutil.go +++ b/pkg/storage/stores/indexshipper/compactor/testutil.go @@ -21,8 +21,8 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) diff --git a/pkg/storage/stores/indexshipper/downloads/index_set.go b/pkg/storage/stores/indexshipper/downloads/index_set.go index 21795d739f..7b80d5d6dd 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set.go @@ -16,11 +16,9 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" ) @@ -366,66 +364,22 @@ func (t *indexSet) AwaitReady(ctx context.Context) error { } func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) { - decompress := shipper_util.IsCompressedFile(fileName) + decompress := storage.IsCompressedFile(fileName) dst := filepath.Join(folderPathForTable, fileName) if decompress { dst = strings.Trim(dst, gzipExtension) } - return filepath.Base(dst), downloadFileFromStorage( + return filepath.Base(dst), storage.DownloadFileFromStorage( dst, decompress, true, - shipper_util.LoggerWithFilename(t.logger, fileName), + storage.LoggerWithFilename(t.logger, fileName), func() (io.ReadCloser, error) { return t.baseIndexSet.GetFile(ctx, t.tableName, t.userID, fileName) }, ) } -// DownloadFileFromStorage downloads a file from storage to given location. -func downloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc shipper_util.GetFileFunc) error { - start := time.Now() - readCloser, err := getFileFunc() - if err != nil { - return err - } - - defer func() { - if err := readCloser.Close(); err != nil { - level.Error(logger).Log("msg", "failed to close read closer", "err", err) - } - }() - - f, err := os.Create(destination) - if err != nil { - return err - } - - defer func() { - if err := f.Close(); err != nil { - level.Warn(logger).Log("msg", "failed to close file", "file", destination) - } - }() - var objectReader io.Reader = readCloser - if decompressFile { - decompressedReader := chunkenc.Gzip.GetReader(readCloser) - defer chunkenc.Gzip.PutReader(decompressedReader) - - objectReader = decompressedReader - } - - _, err = io.Copy(f, objectReader) - if err != nil { - return err - } - - level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start)) - if sync { - return f.Sync() - } - return nil -} - // doConcurrentDownload downloads objects(files) concurrently. It ignores only missing file errors caused by removal of file by compaction. // It returns the names of the files downloaded successfully and leaves it upto the caller to open those files. func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.IndexFile) ([]string, error) { diff --git a/pkg/storage/stores/indexshipper/downloads/index_set_test.go b/pkg/storage/stores/indexshipper/downloads/index_set_test.go index 64407cb05c..173cba4b38 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set_test.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set_test.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/downloads/table.go b/pkg/storage/stores/indexshipper/downloads/table.go index 4d26eeb3fd..911c82ace5 100644 --- a/pkg/storage/stores/indexshipper/downloads/table.go +++ b/pkg/storage/stores/indexshipper/downloads/table.go @@ -15,7 +15,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" ) diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager.go b/pkg/storage/stores/indexshipper/downloads/table_manager.go index 27d685f96a..c826dba4bb 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager.go @@ -17,7 +17,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" ) diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go index 03d354ac88..5b27ae0d9f 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go @@ -13,7 +13,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/validation" ) diff --git a/pkg/storage/stores/indexshipper/downloads/table_test.go b/pkg/storage/stores/indexshipper/downloads/table_test.go index e84f1042d2..34c726f7fc 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/shipper.go b/pkg/storage/stores/indexshipper/shipper.go index 40ffdf8847..6b79832438 100644 --- a/pkg/storage/stores/indexshipper/shipper.go +++ b/pkg/storage/stores/indexshipper/shipper.go @@ -15,9 +15,8 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" "github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/indexshipper/uploads" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -86,7 +85,7 @@ func (cfg *Config) Validate() error { if cfg.Mode == "" { cfg.Mode = ModeReadWrite } - return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) + return storage.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) } type indexShipper struct { diff --git a/pkg/storage/stores/shipper/storage/cached_client.go b/pkg/storage/stores/indexshipper/storage/cached_client.go similarity index 100% rename from pkg/storage/stores/shipper/storage/cached_client.go rename to pkg/storage/stores/indexshipper/storage/cached_client.go diff --git a/pkg/storage/stores/shipper/storage/cached_client_test.go b/pkg/storage/stores/indexshipper/storage/cached_client_test.go similarity index 100% rename from pkg/storage/stores/shipper/storage/cached_client_test.go rename to pkg/storage/stores/indexshipper/storage/cached_client_test.go diff --git a/pkg/storage/stores/shipper/storage/client.go b/pkg/storage/stores/indexshipper/storage/client.go similarity index 100% rename from pkg/storage/stores/shipper/storage/client.go rename to pkg/storage/stores/indexshipper/storage/client.go diff --git a/pkg/storage/stores/shipper/storage/client_test.go b/pkg/storage/stores/indexshipper/storage/client_test.go similarity index 100% rename from pkg/storage/stores/shipper/storage/client_test.go rename to pkg/storage/stores/indexshipper/storage/client_test.go diff --git a/pkg/storage/stores/shipper/storage/index_set.go b/pkg/storage/stores/indexshipper/storage/index_set.go similarity index 100% rename from pkg/storage/stores/shipper/storage/index_set.go rename to pkg/storage/stores/indexshipper/storage/index_set.go diff --git a/pkg/storage/stores/shipper/storage/prefixed_object_client.go b/pkg/storage/stores/indexshipper/storage/prefixed_object_client.go similarity index 100% rename from pkg/storage/stores/shipper/storage/prefixed_object_client.go rename to pkg/storage/stores/indexshipper/storage/prefixed_object_client.go diff --git a/pkg/storage/stores/indexshipper/storage/util.go b/pkg/storage/stores/indexshipper/storage/util.go new file mode 100644 index 0000000000..251830605c --- /dev/null +++ b/pkg/storage/stores/indexshipper/storage/util.go @@ -0,0 +1,114 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + gzip "github.com/klauspost/pgzip" +) + +var ( + gzipReader = sync.Pool{} +) + +// getGzipReader gets or creates a new CompressionReader and reset it to read from src +func getGzipReader(src io.Reader) (io.Reader, error) { + if r := gzipReader.Get(); r != nil { + reader := r.(*gzip.Reader) + err := reader.Reset(src) + if err != nil { + return nil, err + } + return reader, nil + } + reader, err := gzip.NewReader(src) + if err != nil { + return nil, err + } + return reader, nil +} + +// putGzipReader places back in the pool a CompressionReader +func putGzipReader(reader io.Reader) { + gzipReader.Put(reader) +} + +type GetFileFunc func() (io.ReadCloser, error) + +// DownloadFileFromStorage downloads a file from storage to given location. +func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc GetFileFunc) error { + start := time.Now() + readCloser, err := getFileFunc() + if err != nil { + return err + } + + defer func() { + if err := readCloser.Close(); err != nil { + level.Error(logger).Log("msg", "failed to close read closer", "err", err) + } + }() + + f, err := os.Create(destination) + if err != nil { + return err + } + + defer func() { + if err := f.Close(); err != nil { + level.Warn(logger).Log("msg", "failed to close file", "file", destination) + } + }() + var objectReader io.Reader = readCloser + if decompressFile { + decompressedReader, err := getGzipReader(readCloser) + if err != nil { + return err + } + defer putGzipReader(decompressedReader) + + objectReader = decompressedReader + } + + _, err = io.Copy(f, objectReader) + if err != nil { + return err + } + + level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start)) + if sync { + return f.Sync() + } + return nil +} + +func IsCompressedFile(filename string) bool { + return strings.HasSuffix(filename, ".gz") +} + +func LoggerWithFilename(logger log.Logger, filename string) log.Logger { + return log.With(logger, "file-name", filename) +} + +func ValidateSharedStoreKeyPrefix(prefix string) error { + if prefix == "" { + return errors.New("shared store key prefix must be set") + } else if strings.Contains(prefix, "\\") { + // When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator. + // We just need to always use `/` as a path separator. + return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter) + } else if strings.HasPrefix(prefix, delimiter) { + return errors.New("shared store key prefix should never start with a path separator i.e '/'") + } else if !strings.HasSuffix(prefix, delimiter) { + return errors.New("shared store key prefix should end with a path separator i.e '/'") + } + + return nil +} diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/indexshipper/storage/util_test.go similarity index 63% rename from pkg/storage/stores/shipper/util/util_test.go rename to pkg/storage/stores/indexshipper/storage/util_test.go index 496fe601cd..6dfa71a2df 100644 --- a/pkg/storage/stores/shipper/util/util_test.go +++ b/pkg/storage/stores/indexshipper/storage/util_test.go @@ -1,18 +1,18 @@ -package util +package storage import ( "context" "io" "io/ioutil" + "os" "path/filepath" "testing" + gzip "github.com/klauspost/pgzip" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/chunk/client/util" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -29,7 +29,7 @@ func Test_GetFileFromStorage(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) require.NoError(t, err) - indexStorageClient := storage.NewIndexStorageClient(objectClient, "") + indexStorageClient := NewIndexStorageClient(objectClient, "") require.NoError(t, DownloadFileFromStorage(filepath.Join(tempDir, "dest"), false, false, util_log.Logger, func() (io.ReadCloser, error) { @@ -43,8 +43,7 @@ func Test_GetFileFromStorage(t *testing.T) { require.Equal(t, testData, b) // compress the file in storage - err = CompressFile(filepath.Join(tempDir, tableName, "src"), filepath.Join(tempDir, tableName, "src.gz"), true) - require.NoError(t, err) + compressFile(t, filepath.Join(tempDir, tableName, "src"), filepath.Join(tempDir, tableName, "src.gz"), true) // get the compressed file from storage require.NoError(t, DownloadFileFromStorage(filepath.Join(tempDir, "dest.gz"), true, @@ -59,23 +58,30 @@ func Test_GetFileFromStorage(t *testing.T) { require.Equal(t, testData, b) } -func Test_CompressFile(t *testing.T) { - tempDir := t.TempDir() +func compressFile(t *testing.T, src, dest string, sync bool) { + uncompressedFile, err := os.Open(src) + require.NoError(t, err) - uncompressedFilePath := filepath.Join(tempDir, "test-file") - compressedFilePath := filepath.Join(tempDir, "test-file.gz") - decompressedFilePath := filepath.Join(tempDir, "test-file-decompressed") + defer func() { + require.NoError(t, uncompressedFile.Close()) + }() - testData := []byte("test-data") + compressedFile, err := os.Create(dest) + require.NoError(t, err) - require.NoError(t, ioutil.WriteFile(uncompressedFilePath, testData, 0o666)) + defer func() { + require.NoError(t, compressedFile.Close()) + }() - require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath, true)) - require.FileExists(t, compressedFilePath) + compressedWriter := gzip.NewWriter(compressedFile) - testutil.DecompressFile(t, compressedFilePath, decompressedFilePath) - b, err := ioutil.ReadFile(decompressedFilePath) + _, err = io.Copy(compressedWriter, uncompressedFile) require.NoError(t, err) - require.Equal(t, testData, b) + err = compressedWriter.Close() + require.NoError(t, err) + + if sync { + require.NoError(t, compressedFile.Sync()) + } } diff --git a/pkg/storage/stores/indexshipper/table_client.go b/pkg/storage/stores/indexshipper/table_client.go new file mode 100644 index 0000000000..4479cc6599 --- /dev/null +++ b/pkg/storage/stores/indexshipper/table_client.go @@ -0,0 +1,59 @@ +package indexshipper + +import ( + "context" + + "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" + "github.com/grafana/loki/pkg/storage/stores/series/index" +) + +type tableClient struct { + indexStorageClient storage.Client +} + +// NewTableClient creates a client for managing tables in object storage based index store. +// It is typically used when running a table manager. +func NewTableClient(objectClient client.ObjectClient, storageKeyPrefix string) index.TableClient { + return &tableClient{storage.NewIndexStorageClient(objectClient, storageKeyPrefix)} +} + +func (b *tableClient) ListTables(ctx context.Context) ([]string, error) { + b.indexStorageClient.RefreshIndexListCache(ctx) + return b.indexStorageClient.ListTables(ctx) +} + +func (b *tableClient) CreateTable(ctx context.Context, desc config.TableDesc) error { + return nil +} + +func (b *tableClient) Stop() { + b.indexStorageClient.Stop() +} + +func (b *tableClient) DeleteTable(ctx context.Context, tableName string) error { + files, _, err := b.indexStorageClient.ListFiles(ctx, tableName, true) + if err != nil { + return err + } + + for _, file := range files { + err := b.indexStorageClient.DeleteFile(ctx, tableName, file.Name) + if err != nil { + return err + } + } + + return nil +} + +func (b *tableClient) DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error) { + return config.TableDesc{ + Name: name, + }, true, nil +} + +func (b *tableClient) UpdateTable(ctx context.Context, current, expected config.TableDesc) error { + return nil +} diff --git a/pkg/storage/stores/shipper/table_client_test.go b/pkg/storage/stores/indexshipper/table_client_test.go similarity index 91% rename from pkg/storage/stores/shipper/table_client_test.go rename to pkg/storage/stores/indexshipper/table_client_test.go index 2972c66986..f96a0433bb 100644 --- a/pkg/storage/stores/shipper/table_client_test.go +++ b/pkg/storage/stores/indexshipper/table_client_test.go @@ -1,4 +1,4 @@ -package shipper_test +package indexshipper_test import ( "bytes" @@ -10,8 +10,8 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper" ) func TestBoltDBShipperTableClient(t *testing.T) { @@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) { } } - tableClient := shipper.NewBoltDBShipperTableClient(objectClient, "index/") + tableClient := indexshipper.NewTableClient(objectClient, "index/") // check list of tables returns all the folders/tables created above checkExpectedTables(t, tableClient, foldersWithFiles) diff --git a/pkg/storage/stores/indexshipper/uploads/index_set.go b/pkg/storage/stores/indexshipper/uploads/index_set.go index 900ca634ac..612124ba5c 100644 --- a/pkg/storage/stores/indexshipper/uploads/index_set.go +++ b/pkg/storage/stores/indexshipper/uploads/index_set.go @@ -13,7 +13,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/uploads/index_set_test.go b/pkg/storage/stores/indexshipper/uploads/index_set_test.go index 016429b166..fb8e231913 100644 --- a/pkg/storage/stores/indexshipper/uploads/index_set_test.go +++ b/pkg/storage/stores/indexshipper/uploads/index_set_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/uploads/table.go b/pkg/storage/stores/indexshipper/uploads/table.go index e782df18a4..74bda5c281 100644 --- a/pkg/storage/stores/indexshipper/uploads/table.go +++ b/pkg/storage/stores/indexshipper/uploads/table.go @@ -8,7 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/uploads/table_manager.go b/pkg/storage/stores/indexshipper/uploads/table_manager.go index f31ffe5253..a1cc2d0af2 100644 --- a/pkg/storage/stores/indexshipper/uploads/table_manager.go +++ b/pkg/storage/stores/indexshipper/uploads/table_manager.go @@ -9,7 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/uploads/table_manager_test.go b/pkg/storage/stores/indexshipper/uploads/table_manager_test.go index 837994cf1b..f61d7428a7 100644 --- a/pkg/storage/stores/indexshipper/uploads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/uploads/table_manager_test.go @@ -11,7 +11,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" ) const objectsStorageDirName = "objects" diff --git a/pkg/storage/stores/shipper/index/compactor/compacted_index.go b/pkg/storage/stores/shipper/index/compactor/compacted_index.go index 738827bb13..626a70eb6a 100644 --- a/pkg/storage/stores/shipper/index/compactor/compacted_index.go +++ b/pkg/storage/stores/shipper/index/compactor/compacted_index.go @@ -14,9 +14,9 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" series_index "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) diff --git a/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go b/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go index ee78adbb52..2f4e93b63c 100644 --- a/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go +++ b/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile" util_log "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/shipper/index/compactor/index.go b/pkg/storage/stores/shipper/index/compactor/index.go index 38666d872a..724acea579 100644 --- a/pkg/storage/stores/shipper/index/compactor/index.go +++ b/pkg/storage/stores/shipper/index/compactor/index.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/common/model" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) const ( diff --git a/pkg/storage/stores/shipper/index/compactor/index_compactor.go b/pkg/storage/stores/shipper/index/compactor/index_compactor.go index 52b4c33845..02280655a8 100644 --- a/pkg/storage/stores/shipper/index/compactor/index_compactor.go +++ b/pkg/storage/stores/shipper/index/compactor/index_compactor.go @@ -6,7 +6,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" ) const ( diff --git a/pkg/storage/stores/shipper/index/compactor/iterator.go b/pkg/storage/stores/shipper/index/compactor/iterator.go index df7112a1c7..97f1807c6b 100644 --- a/pkg/storage/stores/shipper/index/compactor/iterator.go +++ b/pkg/storage/stores/shipper/index/compactor/iterator.go @@ -8,8 +8,8 @@ import ( "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" series_index "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" ) const ( diff --git a/pkg/storage/stores/shipper/index/compactor/iterator_test.go b/pkg/storage/stores/shipper/index/compactor/iterator_test.go index 6e6a4cfeb3..6a0d47c719 100644 --- a/pkg/storage/stores/shipper/index/compactor/iterator_test.go +++ b/pkg/storage/stores/shipper/index/compactor/iterator_test.go @@ -19,7 +19,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) func Test_ChunkIterator(t *testing.T) { diff --git a/pkg/storage/stores/shipper/index/compactor/table_compactor.go b/pkg/storage/stores/shipper/index/compactor/table_compactor.go index 17bdd4d2e9..4eaa0864e5 100644 --- a/pkg/storage/stores/shipper/index/compactor/table_compactor.go +++ b/pkg/storage/stores/shipper/index/compactor/table_compactor.go @@ -17,8 +17,8 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/util" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) diff --git a/pkg/storage/stores/shipper/index/compactor/table_compactor_test.go b/pkg/storage/stores/shipper/index/compactor/table_compactor_test.go index 21c6b2d187..a5ada88ace 100644 --- a/pkg/storage/stores/shipper/index/compactor/table_compactor_test.go +++ b/pkg/storage/stores/shipper/index/compactor/table_compactor_test.go @@ -21,10 +21,9 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -85,14 +84,14 @@ func (m *mockIndexSet) ListSourceFiles() []storage.IndexFile { } func (m *mockIndexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) { - decompress := shipper_util.IsCompressedFile(indexFile.Name) + decompress := storage.IsCompressedFile(indexFile.Name) dst := filepath.Join(m.workingDir, indexFile.Name) if decompress { dst = strings.Trim(dst, ".gz") } - err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name), - false, shipper_util.LoggerWithFilename(util_log.Logger, indexFile.Name), + err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name), + false, storage.LoggerWithFilename(util_log.Logger, indexFile.Name), func() (io.ReadCloser, error) { rc, _, err := m.objectClient.GetObject(context.Background(), path.Join(m.tableName, m.userID, indexFile.Name)) return rc, err diff --git a/pkg/storage/stores/shipper/table_client.go b/pkg/storage/stores/shipper/table_client.go deleted file mode 100644 index 5759e9242a..0000000000 --- a/pkg/storage/stores/shipper/table_client.go +++ /dev/null @@ -1,57 +0,0 @@ -package shipper - -import ( - "context" - - "github.com/grafana/loki/pkg/storage/chunk/client" - "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/series/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" -) - -type boltDBShipperTableClient struct { - indexStorageClient storage.Client -} - -func NewBoltDBShipperTableClient(objectClient client.ObjectClient, storageKeyPrefix string) index.TableClient { - return &boltDBShipperTableClient{storage.NewIndexStorageClient(objectClient, storageKeyPrefix)} -} - -func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) { - b.indexStorageClient.RefreshIndexListCache(ctx) - return b.indexStorageClient.ListTables(ctx) -} - -func (b *boltDBShipperTableClient) CreateTable(ctx context.Context, desc config.TableDesc) error { - return nil -} - -func (b *boltDBShipperTableClient) Stop() { - b.indexStorageClient.Stop() -} - -func (b *boltDBShipperTableClient) DeleteTable(ctx context.Context, tableName string) error { - files, _, err := b.indexStorageClient.ListFiles(ctx, tableName, true) - if err != nil { - return err - } - - for _, file := range files { - err := b.indexStorageClient.DeleteFile(ctx, tableName, file.Name) - if err != nil { - return err - } - } - - return nil -} - -func (b *boltDBShipperTableClient) DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error) { - return config.TableDesc{ - Name: name, - }, true, nil -} - -func (b *boltDBShipperTableClient) UpdateTable(ctx context.Context, current, expected config.TableDesc) error { - return nil -} diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index cf34a5838a..54e32f5d6c 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -1,127 +1,17 @@ package util import ( - "context" - "errors" "fmt" - "io" - "os" "runtime/debug" - "strings" - "sync" - "time" "unsafe" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - gzip "github.com/klauspost/pgzip" "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/stores/series/index" - util_log "github.com/grafana/loki/pkg/util/log" ) -const ( - delimiter = "/" - sep = "\xff" -) - -var ( - gzipReader = sync.Pool{} - gzipWriter = sync.Pool{} -) - -// getGzipReader gets or creates a new CompressionReader and reset it to read from src -func getGzipReader(src io.Reader) (io.Reader, error) { - if r := gzipReader.Get(); r != nil { - reader := r.(*gzip.Reader) - err := reader.Reset(src) - if err != nil { - return nil, err - } - return reader, nil - } - reader, err := gzip.NewReader(src) - if err != nil { - return nil, err - } - return reader, nil -} - -// putGzipReader places back in the pool a CompressionReader -func putGzipReader(reader io.Reader) { - gzipReader.Put(reader) -} - -// getGzipWriter gets or creates a new CompressionWriter and reset it to write to dst -func getGzipWriter(dst io.Writer) io.WriteCloser { - if w := gzipWriter.Get(); w != nil { - writer := w.(*gzip.Writer) - writer.Reset(dst) - return writer - } - return gzip.NewWriter(dst) -} - -// PutWriter places back in the pool a CompressionWriter -func putGzipWriter(writer io.WriteCloser) { - gzipWriter.Put(writer) -} - -type IndexStorageClient interface { - GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) - GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) -} - -type GetFileFunc func() (io.ReadCloser, error) - -// DownloadFileFromStorage downloads a file from storage to given location. -func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc GetFileFunc) error { - start := time.Now() - readCloser, err := getFileFunc() - if err != nil { - return err - } - - defer func() { - if err := readCloser.Close(); err != nil { - level.Error(logger).Log("msg", "failed to close read closer", "err", err) - } - }() - - f, err := os.Create(destination) - if err != nil { - return err - } - - defer func() { - if err := f.Close(); err != nil { - level.Warn(logger).Log("msg", "failed to close file", "file", destination) - } - }() - var objectReader io.Reader = readCloser - if decompressFile { - decompressedReader, err := getGzipReader(readCloser) - if err != nil { - return err - } - defer putGzipReader(decompressedReader) - - objectReader = decompressedReader - } - - _, err = io.Copy(f, objectReader) - if err != nil { - return err - } - - level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start)) - if sync { - return f.Sync() - } - return nil -} +const sep = "\xff" func BuildIndexFileName(tableName, uploader, dbName string) string { // Files are stored with - @@ -135,48 +25,6 @@ func BuildIndexFileName(tableName, uploader, dbName string) string { return objectKey } -func CompressFile(src, dest string, sync bool) error { - level.Info(util_log.Logger).Log("msg", "compressing the file", "src", src, "dest", dest) - uncompressedFile, err := os.Open(src) - if err != nil { - return err - } - - defer func() { - if err := uncompressedFile.Close(); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to close uncompressed file", "path", src, "err", err) - } - }() - - compressedFile, err := os.Create(dest) - if err != nil { - return err - } - - defer func() { - if err := compressedFile.Close(); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to close compressed file", "path", dest, "err", err) - } - }() - - compressedWriter := getGzipWriter(compressedFile) - defer putGzipWriter(compressedWriter) - - _, err = io.Copy(compressedWriter, uncompressedFile) - if err != nil { - return err - } - - err = compressedWriter.Close() - if err == nil { - return err - } - if sync { - return compressedFile.Sync() - } - return nil -} - type result struct { boltdb *bbolt.DB err error @@ -212,22 +60,6 @@ func safeOpenBoltDbFile(path string, ret chan *result) { res.err = err } -func ValidateSharedStoreKeyPrefix(prefix string) error { - if prefix == "" { - return errors.New("shared store key prefix must be set") - } else if strings.Contains(prefix, "\\") { - // When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator. - // We just need to always use `/` as a path separator. - return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter) - } else if strings.HasPrefix(prefix, delimiter) { - return errors.New("shared store key prefix should never start with a path separator i.e '/'") - } else if !strings.HasSuffix(prefix, delimiter) { - return errors.New("shared store key prefix should end with a path separator i.e '/'") - } - - return nil -} - func QueryKey(q index.Query) string { ret := q.TableName + sep + q.HashValue @@ -246,14 +78,6 @@ func QueryKey(q index.Query) string { return ret } -func IsCompressedFile(filename string) bool { - return strings.HasSuffix(filename, ".gz") -} - -func LoggerWithFilename(logger log.Logger, filename string) log.Logger { - return log.With(logger, "file-name", filename) -} - func GetUnsafeBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) } diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index d04b177106..c67ba9a04b 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -18,9 +18,9 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) diff --git a/pkg/storage/stores/tsdb/compactor_test.go b/pkg/storage/stores/tsdb/compactor_test.go index 6bf4bd8907..a6a9334f29 100644 --- a/pkg/storage/stores/tsdb/compactor_test.go +++ b/pkg/storage/stores/tsdb/compactor_test.go @@ -22,10 +22,9 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/chunk/client/util" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/grafana/loki/pkg/storage/stores/shipper/storage" - shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -81,14 +80,14 @@ func (m *mockIndexSet) ListSourceFiles() []storage.IndexFile { } func (m *mockIndexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) { - decompress := shipper_util.IsCompressedFile(indexFile.Name) + decompress := storage.IsCompressedFile(indexFile.Name) dst := filepath.Join(m.workingDir, indexFile.Name) if decompress { dst = strings.Trim(dst, ".gz") } - err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name), - false, shipper_util.LoggerWithFilename(util_log.Logger, indexFile.Name), + err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name), + false, storage.LoggerWithFilename(util_log.Logger, indexFile.Name), func() (io.ReadCloser, error) { rc, _, err := m.objectClient.GetObject(context.Background(), path.Join(m.tableName, m.userID, indexFile.Name)) return rc, err diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go index e85ccfcc75..bb5aabe670 100644 --- a/tools/tsdb/tsdb-map/main.go +++ b/tools/tsdb/tsdb-map/main.go @@ -12,7 +12,7 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/tsdb"