Index shipper move code (#6704)

* move compactor code to generic indexshipper

* move table client to generic indexshipper

* move storage package and relevant code for managing files on storage to generic indexshipper

* add missing files

* lint
pull/6728/head^2
Sandeep Sukhani 3 years ago committed by GitHub
parent 0d99d80acd
commit 882010a810
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/distributor/distributor.go
  2. 2
      pkg/loki/delete_store_listener.go
  3. 7
      pkg/loki/loki.go
  4. 12
      pkg/loki/modules.go
  5. 5
      pkg/querier/querier.go
  6. 3
      pkg/querier/querier_test.go
  7. 8
      pkg/storage/factory.go
  8. 9
      pkg/storage/stores/indexshipper/compactor/compactor.go
  9. 0
      pkg/storage/stores/indexshipper/compactor/compactor_test.go
  10. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go
  11. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go
  12. 0
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go
  13. 0
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go
  14. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  15. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go
  16. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go
  17. 3
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go
  18. 6
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go
  19. 2
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table_test.go
  20. 0
      pkg/storage/stores/indexshipper/compactor/deletion/metrics.go
  21. 0
      pkg/storage/stores/indexshipper/compactor/deletion/mode.go
  22. 0
      pkg/storage/stores/indexshipper/compactor/deletion/mode_test.go
  23. 0
      pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go
  24. 2
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go
  25. 2
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go
  26. 2
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go
  27. 0
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go
  28. 0
      pkg/storage/stores/indexshipper/compactor/deletion/validation.go
  29. 0
      pkg/storage/stores/indexshipper/compactor/deletion/validation_test.go
  30. 0
      pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go
  31. 0
      pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go
  32. 0
      pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go
  33. 0
      pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go
  34. 0
      pkg/storage/stores/indexshipper/compactor/generationnumber/metrics.go
  35. 11
      pkg/storage/stores/indexshipper/compactor/index_set.go
  36. 0
      pkg/storage/stores/indexshipper/compactor/metrics.go
  37. 0
      pkg/storage/stores/indexshipper/compactor/retention/expiration.go
  38. 0
      pkg/storage/stores/indexshipper/compactor/retention/expiration_test.go
  39. 0
      pkg/storage/stores/indexshipper/compactor/retention/marker.go
  40. 0
      pkg/storage/stores/indexshipper/compactor/retention/marker_test.go
  41. 0
      pkg/storage/stores/indexshipper/compactor/retention/metrics.go
  42. 0
      pkg/storage/stores/indexshipper/compactor/retention/pool.go
  43. 0
      pkg/storage/stores/indexshipper/compactor/retention/retention.go
  44. 0
      pkg/storage/stores/indexshipper/compactor/retention/retention_test.go
  45. 0
      pkg/storage/stores/indexshipper/compactor/retention/series.go
  46. 0
      pkg/storage/stores/indexshipper/compactor/retention/series_test.go
  47. 0
      pkg/storage/stores/indexshipper/compactor/retention/util.go
  48. 0
      pkg/storage/stores/indexshipper/compactor/retention/util_test.go
  49. 4
      pkg/storage/stores/indexshipper/compactor/table.go
  50. 4
      pkg/storage/stores/indexshipper/compactor/table_test.go
  51. 2
      pkg/storage/stores/indexshipper/compactor/testutil.go
  52. 54
      pkg/storage/stores/indexshipper/downloads/index_set.go
  53. 2
      pkg/storage/stores/indexshipper/downloads/index_set_test.go
  54. 2
      pkg/storage/stores/indexshipper/downloads/table.go
  55. 2
      pkg/storage/stores/indexshipper/downloads/table_manager.go
  56. 2
      pkg/storage/stores/indexshipper/downloads/table_manager_test.go
  57. 2
      pkg/storage/stores/indexshipper/downloads/table_test.go
  58. 5
      pkg/storage/stores/indexshipper/shipper.go
  59. 0
      pkg/storage/stores/indexshipper/storage/cached_client.go
  60. 0
      pkg/storage/stores/indexshipper/storage/cached_client_test.go
  61. 0
      pkg/storage/stores/indexshipper/storage/client.go
  62. 0
      pkg/storage/stores/indexshipper/storage/client_test.go
  63. 0
      pkg/storage/stores/indexshipper/storage/index_set.go
  64. 0
      pkg/storage/stores/indexshipper/storage/prefixed_object_client.go
  65. 114
      pkg/storage/stores/indexshipper/storage/util.go
  66. 42
      pkg/storage/stores/indexshipper/storage/util_test.go
  67. 59
      pkg/storage/stores/indexshipper/table_client.go
  68. 6
      pkg/storage/stores/indexshipper/table_client_test.go
  69. 2
      pkg/storage/stores/indexshipper/uploads/index_set.go
  70. 2
      pkg/storage/stores/indexshipper/uploads/index_set_test.go
  71. 2
      pkg/storage/stores/indexshipper/uploads/table.go
  72. 2
      pkg/storage/stores/indexshipper/uploads/table_manager.go
  73. 2
      pkg/storage/stores/indexshipper/uploads/table_manager_test.go
  74. 2
      pkg/storage/stores/shipper/index/compactor/compacted_index.go
  75. 2
      pkg/storage/stores/shipper/index/compactor/compacted_index_test.go
  76. 2
      pkg/storage/stores/shipper/index/compactor/index.go
  77. 2
      pkg/storage/stores/shipper/index/compactor/index_compactor.go
  78. 2
      pkg/storage/stores/shipper/index/compactor/iterator.go
  79. 2
      pkg/storage/stores/shipper/index/compactor/iterator_test.go
  80. 4
      pkg/storage/stores/shipper/index/compactor/table_compactor.go
  81. 11
      pkg/storage/stores/shipper/index/compactor/table_compactor_test.go
  82. 57
      pkg/storage/stores/shipper/table_client.go
  83. 178
      pkg/storage/stores/shipper/util/util.go
  84. 4
      pkg/storage/stores/tsdb/compactor.go
  85. 13
      pkg/storage/stores/tsdb/compactor_test.go
  86. 2
      tools/tsdb/tsdb-map/main.go

@ -10,6 +10,7 @@ import (
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
lru "github.com/hashicorp/golang-lru"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
@ -19,14 +20,12 @@ import (
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"

@ -3,7 +3,7 @@ package loki
import (
"github.com/grafana/dskit/services"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
)
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {

@ -9,8 +9,6 @@ import (
"os"
rt "runtime"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/fatih/color"
"github.com/felixge/fgprof"
"github.com/go-kit/log/level"
@ -43,10 +41,11 @@ import (
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/usagestats"

@ -13,10 +13,6 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -39,6 +35,7 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/lokifrontend/frontend"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
@ -55,10 +52,11 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/generationnumber"
shipper_index "github.com/grafana/loki/pkg/storage/stores/shipper/index"
boltdb_shipper_compactor "github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"

@ -6,11 +6,10 @@ import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"
@ -23,7 +22,7 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/pkg/util/validation"

@ -8,8 +8,6 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -24,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/validation"
)

@ -278,12 +278,16 @@ func NewTableClient(name string, cfg Config, cm ClientMetrics, registerer promet
return local.NewTableClient(cfg.BoltDBConfig.Directory)
case config.StorageTypeGrpc:
return grpc.NewTableClient(cfg.GrpcConfig)
case config.BoltDBShipperType:
case config.BoltDBShipperType, config.TSDBType:
objectClient, err := NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg, cm)
if err != nil {
return nil, err
}
return shipper.NewBoltDBShipperTableClient(objectClient, cfg.BoltDBShipperConfig.SharedStoreKeyPrefix), nil
sharedStoreKeyPrefix := cfg.BoltDBShipperConfig.SharedStoreKeyPrefix
if name == config.TSDBType {
sharedStoreKeyPrefix = cfg.TSDBShipperConfig.SharedStoreKeyPrefix
}
return indexshipper.NewTableClient(objectClient, sharedStoreKeyPrefix), nil
default:
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v, %v", name, config.StorageTypeAWS, config.StorageTypeCassandra, config.StorageTypeInMemory, config.StorageTypeGCP, config.StorageTypeBigTable, config.StorageTypeBigTableHashed, config.StorageTypeGrpc)
}

@ -23,10 +23,9 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
shipper_storage "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
@ -114,7 +113,7 @@ func (cfg *Config) Validate() error {
return err
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
return shipper_storage.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
type Compactor struct {

@ -7,7 +7,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
func TestDeleteRequest_IsDeleted(t *testing.T) {

@ -9,7 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
const testUserID = "test-user"

@ -16,8 +16,8 @@ import (
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
)
type (

@ -8,12 +8,11 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
)
func TestDeleteRequestsStore(t *testing.T) {

@ -15,8 +15,8 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -65,8 +65,8 @@ func (t *deleteRequestsTable) init() error {
_, err := os.Stat(t.dbPath)
if err != nil {
err = shipper_util.DownloadFileFromStorage(t.dbPath, true,
true, shipper_util.LoggerWithFilename(util_log.Logger, deleteRequestsIndexFileName), func() (io.ReadCloser, error) {
err = storage.DownloadFileFromStorage(t.dbPath, true,
true, storage.LoggerWithFilename(util_log.Logger, deleteRequestsIndexFileName), func() (io.ReadCloser, error) {
return t.indexStorageClient.GetFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName)
})
if err != nil && !t.indexStorageClient.IsFileNotFoundErr(err) {

@ -9,8 +9,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

@ -12,7 +12,7 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -12,7 +12,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/validation"
)

@ -3,7 +3,7 @@ package deletion
import (
"context"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
type perTenantDeleteRequestsClient struct {

@ -14,10 +14,9 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -120,14 +119,14 @@ func (is *indexSet) ListSourceFiles() []storage.IndexFile {
}
func (is *indexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) {
decompress := shipper_util.IsCompressedFile(indexFile.Name)
decompress := storage.IsCompressedFile(indexFile.Name)
dst := filepath.Join(is.workingDir, indexFile.Name)
if decompress {
dst = strings.Trim(dst, gzipExtension)
}
err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name),
false, shipper_util.LoggerWithFilename(is.logger, indexFile.Name),
err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name),
false, storage.LoggerWithFilename(is.logger, indexFile.Name),
func() (io.ReadCloser, error) {
return is.baseIndexSet.GetFile(is.ctx, is.tableName, is.userID, indexFile.Name)
})

@ -14,8 +14,8 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -15,8 +15,8 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
)
const (

@ -21,8 +21,8 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
)

@ -16,11 +16,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)
@ -366,66 +364,22 @@ func (t *indexSet) AwaitReady(ctx context.Context) error {
}
func (t *indexSet) downloadFileFromStorage(ctx context.Context, fileName, folderPathForTable string) (string, error) {
decompress := shipper_util.IsCompressedFile(fileName)
decompress := storage.IsCompressedFile(fileName)
dst := filepath.Join(folderPathForTable, fileName)
if decompress {
dst = strings.Trim(dst, gzipExtension)
}
return filepath.Base(dst), downloadFileFromStorage(
return filepath.Base(dst), storage.DownloadFileFromStorage(
dst,
decompress,
true,
shipper_util.LoggerWithFilename(t.logger, fileName),
storage.LoggerWithFilename(t.logger, fileName),
func() (io.ReadCloser, error) {
return t.baseIndexSet.GetFile(ctx, t.tableName, t.userID, fileName)
},
)
}
// DownloadFileFromStorage downloads a file from storage to given location.
func downloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc shipper_util.GetFileFunc) error {
start := time.Now()
readCloser, err := getFileFunc()
if err != nil {
return err
}
defer func() {
if err := readCloser.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close read closer", "err", err)
}
}()
f, err := os.Create(destination)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
level.Warn(logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
if decompressFile {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)
objectReader = decompressedReader
}
_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start))
if sync {
return f.Sync()
}
return nil
}
// doConcurrentDownload downloads objects(files) concurrently. It ignores only missing file errors caused by removal of file by compaction.
// It returns the names of the files downloaded successfully and leaves it upto the caller to open those files.
func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.IndexFile) ([]string, error) {

@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -15,7 +15,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

@ -17,7 +17,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)

@ -13,7 +13,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/validation"
)

@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -15,9 +15,8 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/uploads"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -86,7 +85,7 @@ func (cfg *Config) Validate() error {
if cfg.Mode == "" {
cfg.Mode = ModeReadWrite
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
return storage.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
type indexShipper struct {

@ -0,0 +1,114 @@
package storage
import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
gzip "github.com/klauspost/pgzip"
)
var (
gzipReader = sync.Pool{}
)
// getGzipReader gets or creates a new CompressionReader and reset it to read from src
func getGzipReader(src io.Reader) (io.Reader, error) {
if r := gzipReader.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
if err != nil {
return nil, err
}
return reader, nil
}
reader, err := gzip.NewReader(src)
if err != nil {
return nil, err
}
return reader, nil
}
// putGzipReader places back in the pool a CompressionReader
func putGzipReader(reader io.Reader) {
gzipReader.Put(reader)
}
type GetFileFunc func() (io.ReadCloser, error)
// DownloadFileFromStorage downloads a file from storage to given location.
func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc GetFileFunc) error {
start := time.Now()
readCloser, err := getFileFunc()
if err != nil {
return err
}
defer func() {
if err := readCloser.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close read closer", "err", err)
}
}()
f, err := os.Create(destination)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
level.Warn(logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
if decompressFile {
decompressedReader, err := getGzipReader(readCloser)
if err != nil {
return err
}
defer putGzipReader(decompressedReader)
objectReader = decompressedReader
}
_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start))
if sync {
return f.Sync()
}
return nil
}
func IsCompressedFile(filename string) bool {
return strings.HasSuffix(filename, ".gz")
}
func LoggerWithFilename(logger log.Logger, filename string) log.Logger {
return log.With(logger, "file-name", filename)
}
func ValidateSharedStoreKeyPrefix(prefix string) error {
if prefix == "" {
return errors.New("shared store key prefix must be set")
} else if strings.Contains(prefix, "\\") {
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
// We just need to always use `/` as a path separator.
return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter)
} else if strings.HasPrefix(prefix, delimiter) {
return errors.New("shared store key prefix should never start with a path separator i.e '/'")
} else if !strings.HasSuffix(prefix, delimiter) {
return errors.New("shared store key prefix should end with a path separator i.e '/'")
}
return nil
}

@ -1,18 +1,18 @@
package util
package storage
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
gzip "github.com/klauspost/pgzip"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -29,7 +29,7 @@ func Test_GetFileFromStorage(t *testing.T) {
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir})
require.NoError(t, err)
indexStorageClient := storage.NewIndexStorageClient(objectClient, "")
indexStorageClient := NewIndexStorageClient(objectClient, "")
require.NoError(t, DownloadFileFromStorage(filepath.Join(tempDir, "dest"), false,
false, util_log.Logger, func() (io.ReadCloser, error) {
@ -43,8 +43,7 @@ func Test_GetFileFromStorage(t *testing.T) {
require.Equal(t, testData, b)
// compress the file in storage
err = CompressFile(filepath.Join(tempDir, tableName, "src"), filepath.Join(tempDir, tableName, "src.gz"), true)
require.NoError(t, err)
compressFile(t, filepath.Join(tempDir, tableName, "src"), filepath.Join(tempDir, tableName, "src.gz"), true)
// get the compressed file from storage
require.NoError(t, DownloadFileFromStorage(filepath.Join(tempDir, "dest.gz"), true,
@ -59,23 +58,30 @@ func Test_GetFileFromStorage(t *testing.T) {
require.Equal(t, testData, b)
}
func Test_CompressFile(t *testing.T) {
tempDir := t.TempDir()
func compressFile(t *testing.T, src, dest string, sync bool) {
uncompressedFile, err := os.Open(src)
require.NoError(t, err)
uncompressedFilePath := filepath.Join(tempDir, "test-file")
compressedFilePath := filepath.Join(tempDir, "test-file.gz")
decompressedFilePath := filepath.Join(tempDir, "test-file-decompressed")
defer func() {
require.NoError(t, uncompressedFile.Close())
}()
testData := []byte("test-data")
compressedFile, err := os.Create(dest)
require.NoError(t, err)
require.NoError(t, ioutil.WriteFile(uncompressedFilePath, testData, 0o666))
defer func() {
require.NoError(t, compressedFile.Close())
}()
require.NoError(t, CompressFile(uncompressedFilePath, compressedFilePath, true))
require.FileExists(t, compressedFilePath)
compressedWriter := gzip.NewWriter(compressedFile)
testutil.DecompressFile(t, compressedFilePath, decompressedFilePath)
b, err := ioutil.ReadFile(decompressedFilePath)
_, err = io.Copy(compressedWriter, uncompressedFile)
require.NoError(t, err)
require.Equal(t, testData, b)
err = compressedWriter.Close()
require.NoError(t, err)
if sync {
require.NoError(t, compressedFile.Sync())
}
}

@ -0,0 +1,59 @@
package indexshipper
import (
"context"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/series/index"
)
type tableClient struct {
indexStorageClient storage.Client
}
// NewTableClient creates a client for managing tables in object storage based index store.
// It is typically used when running a table manager.
func NewTableClient(objectClient client.ObjectClient, storageKeyPrefix string) index.TableClient {
return &tableClient{storage.NewIndexStorageClient(objectClient, storageKeyPrefix)}
}
func (b *tableClient) ListTables(ctx context.Context) ([]string, error) {
b.indexStorageClient.RefreshIndexListCache(ctx)
return b.indexStorageClient.ListTables(ctx)
}
func (b *tableClient) CreateTable(ctx context.Context, desc config.TableDesc) error {
return nil
}
func (b *tableClient) Stop() {
b.indexStorageClient.Stop()
}
func (b *tableClient) DeleteTable(ctx context.Context, tableName string) error {
files, _, err := b.indexStorageClient.ListFiles(ctx, tableName, true)
if err != nil {
return err
}
for _, file := range files {
err := b.indexStorageClient.DeleteFile(ctx, tableName, file.Name)
if err != nil {
return err
}
}
return nil
}
func (b *tableClient) DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error) {
return config.TableDesc{
Name: name,
}, true, nil
}
func (b *tableClient) UpdateTable(ctx context.Context, current, expected config.TableDesc) error {
return nil
}

@ -1,4 +1,4 @@
package shipper_test
package indexshipper_test
import (
"bytes"
@ -10,8 +10,8 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
)
func TestBoltDBShipperTableClient(t *testing.T) {
@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) {
}
}
tableClient := shipper.NewBoltDBShipperTableClient(objectClient, "index/")
tableClient := indexshipper.NewTableClient(objectClient, "index/")
// check list of tables returns all the folders/tables created above
checkExpectedTables(t, tableClient, foldersWithFiles)

@ -13,7 +13,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -8,7 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -9,7 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -11,7 +11,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
)
const objectsStorageDirName = "objects"

@ -14,9 +14,9 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
series_index "github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

@ -16,7 +16,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile"
util_log "github.com/grafana/loki/pkg/util/log"
)

@ -8,7 +8,7 @@ import (
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
const (

@ -6,7 +6,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
)
const (

@ -8,8 +8,8 @@ import (
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
series_index "github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)
const (

@ -19,7 +19,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
func Test_ChunkIterator(t *testing.T) {

@ -17,8 +17,8 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

@ -21,10 +21,9 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -85,14 +84,14 @@ func (m *mockIndexSet) ListSourceFiles() []storage.IndexFile {
}
func (m *mockIndexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) {
decompress := shipper_util.IsCompressedFile(indexFile.Name)
decompress := storage.IsCompressedFile(indexFile.Name)
dst := filepath.Join(m.workingDir, indexFile.Name)
if decompress {
dst = strings.Trim(dst, ".gz")
}
err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name),
false, shipper_util.LoggerWithFilename(util_log.Logger, indexFile.Name),
err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name),
false, storage.LoggerWithFilename(util_log.Logger, indexFile.Name),
func() (io.ReadCloser, error) {
rc, _, err := m.objectClient.GetObject(context.Background(), path.Join(m.tableName, m.userID, indexFile.Name))
return rc, err

@ -1,57 +0,0 @@
package shipper
import (
"context"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
)
type boltDBShipperTableClient struct {
indexStorageClient storage.Client
}
func NewBoltDBShipperTableClient(objectClient client.ObjectClient, storageKeyPrefix string) index.TableClient {
return &boltDBShipperTableClient{storage.NewIndexStorageClient(objectClient, storageKeyPrefix)}
}
func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) {
b.indexStorageClient.RefreshIndexListCache(ctx)
return b.indexStorageClient.ListTables(ctx)
}
func (b *boltDBShipperTableClient) CreateTable(ctx context.Context, desc config.TableDesc) error {
return nil
}
func (b *boltDBShipperTableClient) Stop() {
b.indexStorageClient.Stop()
}
func (b *boltDBShipperTableClient) DeleteTable(ctx context.Context, tableName string) error {
files, _, err := b.indexStorageClient.ListFiles(ctx, tableName, true)
if err != nil {
return err
}
for _, file := range files {
err := b.indexStorageClient.DeleteFile(ctx, tableName, file.Name)
if err != nil {
return err
}
}
return nil
}
func (b *boltDBShipperTableClient) DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error) {
return config.TableDesc{
Name: name,
}, true, nil
}
func (b *boltDBShipperTableClient) UpdateTable(ctx context.Context, current, expected config.TableDesc) error {
return nil
}

@ -1,127 +1,17 @@
package util
import (
"context"
"errors"
"fmt"
"io"
"os"
"runtime/debug"
"strings"
"sync"
"time"
"unsafe"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
gzip "github.com/klauspost/pgzip"
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/series/index"
util_log "github.com/grafana/loki/pkg/util/log"
)
const (
delimiter = "/"
sep = "\xff"
)
var (
gzipReader = sync.Pool{}
gzipWriter = sync.Pool{}
)
// getGzipReader gets or creates a new CompressionReader and reset it to read from src
func getGzipReader(src io.Reader) (io.Reader, error) {
if r := gzipReader.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
if err != nil {
return nil, err
}
return reader, nil
}
reader, err := gzip.NewReader(src)
if err != nil {
return nil, err
}
return reader, nil
}
// putGzipReader places back in the pool a CompressionReader
func putGzipReader(reader io.Reader) {
gzipReader.Put(reader)
}
// getGzipWriter gets or creates a new CompressionWriter and reset it to write to dst
func getGzipWriter(dst io.Writer) io.WriteCloser {
if w := gzipWriter.Get(); w != nil {
writer := w.(*gzip.Writer)
writer.Reset(dst)
return writer
}
return gzip.NewWriter(dst)
}
// PutWriter places back in the pool a CompressionWriter
func putGzipWriter(writer io.WriteCloser) {
gzipWriter.Put(writer)
}
type IndexStorageClient interface {
GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error)
GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error)
}
type GetFileFunc func() (io.ReadCloser, error)
// DownloadFileFromStorage downloads a file from storage to given location.
func DownloadFileFromStorage(destination string, decompressFile bool, sync bool, logger log.Logger, getFileFunc GetFileFunc) error {
start := time.Now()
readCloser, err := getFileFunc()
if err != nil {
return err
}
defer func() {
if err := readCloser.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close read closer", "err", err)
}
}()
f, err := os.Create(destination)
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
level.Warn(logger).Log("msg", "failed to close file", "file", destination)
}
}()
var objectReader io.Reader = readCloser
if decompressFile {
decompressedReader, err := getGzipReader(readCloser)
if err != nil {
return err
}
defer putGzipReader(decompressedReader)
objectReader = decompressedReader
}
_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
level.Info(logger).Log("msg", "downloaded file", "total_time", time.Since(start))
if sync {
return f.Sync()
}
return nil
}
const sep = "\xff"
func BuildIndexFileName(tableName, uploader, dbName string) string {
// Files are stored with <uploader>-<db-name>
@ -135,48 +25,6 @@ func BuildIndexFileName(tableName, uploader, dbName string) string {
return objectKey
}
func CompressFile(src, dest string, sync bool) error {
level.Info(util_log.Logger).Log("msg", "compressing the file", "src", src, "dest", dest)
uncompressedFile, err := os.Open(src)
if err != nil {
return err
}
defer func() {
if err := uncompressedFile.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close uncompressed file", "path", src, "err", err)
}
}()
compressedFile, err := os.Create(dest)
if err != nil {
return err
}
defer func() {
if err := compressedFile.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close compressed file", "path", dest, "err", err)
}
}()
compressedWriter := getGzipWriter(compressedFile)
defer putGzipWriter(compressedWriter)
_, err = io.Copy(compressedWriter, uncompressedFile)
if err != nil {
return err
}
err = compressedWriter.Close()
if err == nil {
return err
}
if sync {
return compressedFile.Sync()
}
return nil
}
type result struct {
boltdb *bbolt.DB
err error
@ -212,22 +60,6 @@ func safeOpenBoltDbFile(path string, ret chan *result) {
res.err = err
}
func ValidateSharedStoreKeyPrefix(prefix string) error {
if prefix == "" {
return errors.New("shared store key prefix must be set")
} else if strings.Contains(prefix, "\\") {
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
// We just need to always use `/` as a path separator.
return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter)
} else if strings.HasPrefix(prefix, delimiter) {
return errors.New("shared store key prefix should never start with a path separator i.e '/'")
} else if !strings.HasSuffix(prefix, delimiter) {
return errors.New("shared store key prefix should end with a path separator i.e '/'")
}
return nil
}
func QueryKey(q index.Query) string {
ret := q.TableName + sep + q.HashValue
@ -246,14 +78,6 @@ func QueryKey(q index.Query) string {
return ret
}
func IsCompressedFile(filename string) bool {
return strings.HasSuffix(filename, ".gz")
}
func LoggerWithFilename(logger log.Logger, filename string) log.Logger {
return log.With(logger, "file-name", filename)
}
func GetUnsafeBytes(s string) []byte {
return *((*[]byte)(unsafe.Pointer(&s)))
}

@ -18,9 +18,9 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)

@ -22,10 +22,9 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -81,14 +80,14 @@ func (m *mockIndexSet) ListSourceFiles() []storage.IndexFile {
}
func (m *mockIndexSet) GetSourceFile(indexFile storage.IndexFile) (string, error) {
decompress := shipper_util.IsCompressedFile(indexFile.Name)
decompress := storage.IsCompressedFile(indexFile.Name)
dst := filepath.Join(m.workingDir, indexFile.Name)
if decompress {
dst = strings.Trim(dst, ".gz")
}
err := shipper_util.DownloadFileFromStorage(dst, shipper_util.IsCompressedFile(indexFile.Name),
false, shipper_util.LoggerWithFilename(util_log.Logger, indexFile.Name),
err := storage.DownloadFileFromStorage(dst, storage.IsCompressedFile(indexFile.Name),
false, storage.LoggerWithFilename(util_log.Logger, indexFile.Name),
func() (io.ReadCloser, error) {
rc, _, err := m.objectClient.GetObject(context.Background(), path.Join(m.tableName, m.userID, indexFile.Name))
return rc, err

@ -12,7 +12,7 @@ import (
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/tsdb"

Loading…
Cancel
Save