flush boltdb to object store (#1837)

* flush boltdb to object store

files are stored in folder per periodic table and are named after ingester
flushed every 15 mins to make index available to other services
files are also flushed before ingester stops to avoid any data loss
new stores can be implemented easily
ingester to also query store when using boltdb

* persisting uploader name across restarts, detecting objectstore type from periodic config, other refactorings

* updated cli flag for active directory

* add tests for boltdb shipper and update vendor

* syncing boltdb files to disk during update

* sleep in tests to let mtime of boltdb file be changed

* add documentation for boltdb shipper and remove unwanted config
pull/1964/head
Sandeep Sukhani 6 years ago committed by GitHub
parent 8b924a52cb
commit fad3b61be7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/README.md
  2. 86
      docs/operations/storage/boltdb-shipper.md
  3. 1
      go.mod
  4. 4
      pkg/ingester/flush_test.go
  5. 61
      pkg/ingester/ingester.go
  6. 81
      pkg/ingester/ingester_test.go
  7. 17
      pkg/ingester/instance.go
  8. 3
      pkg/logql/stats/context.go
  9. 74
      pkg/logql/stats/grpc.go
  10. 20
      pkg/logql/stats/grpc_test.go
  11. 40
      pkg/loki/modules.go
  12. 30
      pkg/loki/modules_test.go
  13. 58
      pkg/storage/store.go
  14. 61
      pkg/storage/stores/local/boltdb_index_client.go
  15. 216
      pkg/storage/stores/local/downloads.go
  16. 121
      pkg/storage/stores/local/downloads_test.go
  17. 294
      pkg/storage/stores/local/shipper.go
  18. 112
      pkg/storage/stores/local/uploads.go
  19. 165
      pkg/storage/stores/local/uploads_test.go
  20. 47
      pkg/storage/stores/util/object_client.go

@ -45,6 +45,7 @@ simplifies the operation and significantly lowers the cost of Loki.
4. [Storage](operations/storage/README.md)
1. [Table Manager](operations/storage/table-manager.md)
2. [Retention](operations/storage/retention.md)
3. [BoltDB Shipper](operations/storage/boltdb-shipper.md)
5. [Multi-tenancy](operations/multi-tenancy.md)
6. [Loki Canary](operations/loki-canary.md)
8. [HTTP API](api.md)

@ -0,0 +1,86 @@
# Loki with BoltDB Shipper
:warning: BoltDB Shipper is still an experimental feature. It is not recommended to be used in production environments.
BoltDB Shipper lets you run Loki without any dependency on NoSQL stores for storing index.
It locally stores the index in BoltDB files instead and keeps shipping those files to a shared object store i.e the same object store which is being used for storing chunks.
It also keeps syncing BoltDB files from shared object store to a configured local directory for getting index entries created by other services of same Loki cluster.
This helps run Loki with one less dependency and also saves costs in storage since object stores are likely to be much cheaper compared to cost of a hosted NoSQL store or running a self hosted instance of Cassandra.
## Example Configuration
Example configuration with GCS:
```yaml
schema_config:
configs:
- from: 2018-04-15
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: loki_index_
period: 168h
storage_config:
gcs:
bucket_name: GCS_BUCKET_NAME
boltdb_shipper_config:
active_index_directory: /loki/index
cache_location: /loki/boltdb-cache
```
This would run Loki with BoltDB Shipper storing BoltDB files locally at `/loki/index` and chunks at configured `GCS_BUCKET_NAME`.
It would also keep shipping BoltDB files periodically to same configured bucket.
It would also keep downloading BoltDB files from shared bucket uploaded by other ingesters to `/loki/boltdb-cache` folder locally.
## Operational Details
Loki can be configured to run as just a single vertically scaled instance or as a cluster of horizontally scaled single binary(running all Loki services) instances or in micro-services mode running just one of the services in each instance.
When it comes to reads and writes, Ingesters are the ones which writes the index and chunks to stores and Queriers are the ones which reads index and chunks from the store for serving requests.
Before we get into more details, it is important to understand how Loki manages index in stores. Loki shards index as per configured period which defaults to 7 days i.e when it comes to table based stores like Bigtable/Cassandra/DynamoDB there would be separate table per week containing index for that week.
In case of BoltDB files there is no concept of tables so it creates a BoltDB file per week. Files/Tables created per week are identified by a configured `prefix_` + `<period-number-since-epoch>`.
Here `<period-number-since-epoch>` in case of default config would be week number since epoch.
For example, if you have prefix set to `loki_index_` and a write requests comes in on 20th April 2020, it would be stored in table/file named `loki_index_2624` because it has been `2623` weeks since epoch and we are in `2624`th week.
Since sharding of index creates multiple files when using BoltDB, BoltDB Shipper would create a folder per week and add files for that week in that folder and names those files after ingesters which created them.
To show how BoltDB files in shared object store would look like, let us consider 2 ingesters named `ingester-0` and `ingester-1` running in a Loki cluster and
they both having shipped files for week `2623` and `2624` with prefix `loki_index_`, here is how the files would look like:
```
└── index
├── loki_index_2623
│ ├── ingester-0
│ └── ingester-1
└── loki_index_2624
├── ingester-0
└── ingester-1
```
*NOTE: We also add a timestamp to names of the files to randomize the names to avoid overwriting files when running Ingesters with same name and not have a persistent storage. Timestamps not shown here for simplification*
Let us talk about more in depth about how both Ingesters and Queriers work when running them with BoltDB Shipper.
### Ingesters
Ingesters keep writing the index to BoltDB files in `active_index_directory` and BoltDB Shipper keeps looking for new and updated files in that directory every 15 Minutes to upload them to the shared object store.
When running Loki in clustered mode there could be multiple ingesters serving write requests hence each of them generating BoltDB files locally.
*NOTE: To avoid any loss of index when Ingester crashes it is recommended to run Ingesters as statefulset(when using k8s) with a persistent storage for storing index files.*
Another important detail to note is when chunks are flushed they are available for reads in object store instantly while index is not since we only upload them every 15 Minutes with BoltDB shipper.
To avoid missing logs from queries which happen to be indexed in BoltDB files which are not shipped yet, while serving queries for in-memory logs, Ingesters would also do a store query for `now()` - (`max_chunk_age` + `30 Min`) to `<end-time-from-query-request>`.
### Queriers
Queriers lazily loads BoltDB files from shared object store to configured `cache_location`.
When a querier receives a read request, query range from request is resolved to period numbers and all the files for those period numbers are downloaded to `cache_location` if not already.
Once we have downloaded files for a period we keep looking for updates in shared object store and download them every 15 Minutes by default.
Frequency for checking updates can be configured with `resync_interval` config.
To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location.
ttl can be configured using `cache_ttl` config.

@ -51,6 +51,7 @@ require (
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
google.golang.org/grpc v1.25.1

@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/logql"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
@ -223,7 +225,7 @@ func (s *testStore) IsLocal() bool {
return false
}
func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
func (s *testStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
return nil, nil
}

@ -22,8 +22,12 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util/validation"
)
@ -63,6 +67,9 @@ type Config struct {
// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error)
QueryStore bool `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"`
}
// RegisterFlags registers the flags.
@ -113,6 +120,7 @@ type Ingester struct {
// ChunkStore is the interface we need to store chunks.
type ChunkStore interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error)
}
// New makes a new Ingester.
@ -241,13 +249,35 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
// Query the ingests for log streams matching a set of matchers.
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
instanceID, err := user.ExtractOrgID(queryServer.Context())
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
instance := i.getOrCreateInstance(instanceID)
return instance.Query(req, queryServer)
itrs, err := instance.Query(ctx, req)
if err != nil {
return err
}
if storeReq := buildStoreRequest(i.cfg, req); storeReq != nil {
storeItr, err := i.store.LazyQuery(ctx, logql.SelectParams{QueryRequest: storeReq})
if err != nil {
return err
}
itrs = append(itrs, storeItr)
}
heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)
defer helpers.LogError("closing iterator", heapItr.Close)
return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit)
}
// Label returns the set of labels for the stream this ingester knows about.
@ -356,3 +386,30 @@ func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRe
return &resp, nil
}
// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration.
// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure
// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query.
func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRequest {
if !cfg.QueryStore {
return nil
}
start := req.Start
end := req.End
if cfg.QueryStoreMaxLookBackPeriod != 0 {
oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod)
if oldestStartTime.After(req.Start) {
start = oldestStartTime
}
}
if start.After(end) {
return nil
}
newRequest := *req
newRequest.Start = start
newRequest.End = end
return &newRequest
}

@ -7,6 +7,9 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
@ -247,6 +250,10 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
return nil
}
func (s *mockStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
return nil, nil
}
type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
@ -269,3 +276,77 @@ func defaultLimitsTestConfig() validation.Limits {
flagext.DefaultValues(&limits)
return limits
}
func TestIngester_buildStoreRequest(t *testing.T) {
ingesterQueryRequest := logproto.QueryRequest{
Selector: `{foo="bar"}`,
Limit: 100,
}
now := time.Now()
for _, tc := range []struct {
name string
queryStore bool
maxLookBackPeriod time.Duration
ingesterQueryRequest *logproto.QueryRequest
expectedStoreQueryRequest *logproto.QueryRequest
}{
{
name: "do not query store",
queryStore: false,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Minute), now),
expectedStoreQueryRequest: nil,
},
{
name: "query store with max look back covering whole request duration",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now),
expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now),
},
{
name: "query store with max look back covering partial request duration",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-2*time.Hour), now),
expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Hour), now),
},
{
name: "query store with max look back not covering request duration at all",
queryStore: true,
maxLookBackPeriod: time.Hour,
ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-4*time.Hour), now.Add(-2*time.Hour)),
expectedStoreQueryRequest: nil,
},
} {
t.Run(tc.name, func(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.QueryStore = tc.queryStore
ingesterConfig.QueryStoreMaxLookBackPeriod = tc.maxLookBackPeriod
storeRequest := buildStoreRequest(ingesterConfig, tc.ingesterQueryRequest)
if tc.expectedStoreQueryRequest == nil {
require.Nil(t, storeRequest)
return
}
// because start time of store could be changed and built based on time when function is called we can't predict expected start time.
// So allowing upto 1s difference between expected and actual start time of store query request.
require.Equal(t, tc.expectedStoreQueryRequest.Selector, storeRequest.Selector)
require.Equal(t, tc.expectedStoreQueryRequest.Limit, storeRequest.Limit)
require.Equal(t, tc.expectedStoreQueryRequest.End, storeRequest.End)
if storeRequest.Start.Sub(tc.expectedStoreQueryRequest.Start) > time.Second {
t.Fatalf("expected upto 1s difference in expected and acutal store request end time but got %d", storeRequest.End.Sub(tc.expectedStoreQueryRequest.End))
}
})
}
}
func recreateRequestWithTime(req logproto.QueryRequest, start, end time.Time) *logproto.QueryRequest {
newReq := req
newReq.Start = start
newReq.End = end
return &newReq
}

@ -186,18 +186,14 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
return s.labels
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)
func (i *instance) Query(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
return nil, err
}
filter, err := expr.Filter()
if err != nil {
return err
return nil, err
}
ingStats := stats.GetIngesterData(ctx)
@ -215,13 +211,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
},
)
if err != nil {
return err
return nil, err
}
iter := iter.NewHeapIterator(ctx, iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)
return sendBatches(ctx, iter, queryServer, req.Limit)
return iters, nil
}
func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {

@ -138,9 +138,8 @@ func GetStoreData(ctx context.Context) *StoreData {
// Snapshot compute query statistics from a context using the total exec time.
func Snapshot(ctx context.Context, execTime time.Duration) Result {
var res Result
// ingester data is decoded from grpc trailers.
res.Ingester = decodeTrailers(ctx)
res := decodeTrailers(ctx)
// collect data from store.
s, ok := ctx.Value(storeKey).(*StoreData)
if ok {

@ -14,6 +14,7 @@ import (
const (
ingesterDataKey = "ingester_data"
chunkDataKey = "chunk_data"
storeDataKey = "store_data"
)
type trailerCollector struct {
@ -70,32 +71,45 @@ func encodeTrailer(ctx context.Context) (metadata.MD, error) {
}
meta.Set(chunkDataKey, data)
}
storeData, ok := ctx.Value(storeKey).(*StoreData)
if ok {
data, err := jsoniter.MarshalToString(storeData)
if err != nil {
return meta, err
}
meta.Set(storeDataKey, data)
}
return meta, nil
}
func decodeTrailers(ctx context.Context) Ingester {
var res Ingester
func decodeTrailers(ctx context.Context) Result {
var res Result
collector, ok := ctx.Value(trailersKey).(*trailerCollector)
if !ok {
return res
}
res.TotalReached = int32(len(collector.trailers))
res.Ingester.TotalReached = int32(len(collector.trailers))
for _, meta := range collector.trailers {
ing := decodeTrailer(meta)
res.TotalChunksMatched += ing.TotalChunksMatched
res.TotalBatches += ing.TotalBatches
res.TotalLinesSent += ing.TotalLinesSent
res.HeadChunkBytes += ing.HeadChunkBytes
res.HeadChunkLines += ing.HeadChunkLines
res.DecompressedBytes += ing.DecompressedBytes
res.DecompressedLines += ing.DecompressedLines
res.CompressedBytes += ing.CompressedBytes
res.TotalDuplicates += ing.TotalDuplicates
res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched
res.Ingester.TotalBatches += ing.Ingester.TotalBatches
res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent
res.Ingester.HeadChunkBytes += ing.Ingester.HeadChunkBytes
res.Ingester.HeadChunkLines += ing.Ingester.HeadChunkLines
res.Ingester.DecompressedBytes += ing.Ingester.DecompressedBytes
res.Ingester.DecompressedLines += ing.Ingester.DecompressedLines
res.Ingester.CompressedBytes += ing.Ingester.CompressedBytes
res.Ingester.TotalDuplicates += ing.Ingester.TotalDuplicates
res.Store.TotalChunksRef += ing.Store.TotalChunksRef
res.Store.TotalChunksDownloaded += ing.Store.TotalChunksDownloaded
res.Store.ChunksDownloadTime += ing.Store.ChunksDownloadTime
}
return res
}
func decodeTrailer(meta *metadata.MD) Ingester {
func decodeTrailer(meta *metadata.MD) Result {
var ingData IngesterData
values := meta.Get(ingesterDataKey)
if len(values) == 1 {
@ -110,15 +124,29 @@ func decodeTrailer(meta *metadata.MD) Ingester {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return Ingester{
TotalChunksMatched: ingData.TotalChunksMatched,
TotalBatches: ingData.TotalBatches,
TotalLinesSent: ingData.TotalLinesSent,
HeadChunkBytes: chunkData.HeadChunkBytes,
HeadChunkLines: chunkData.HeadChunkLines,
DecompressedBytes: chunkData.DecompressedBytes,
DecompressedLines: chunkData.DecompressedLines,
CompressedBytes: chunkData.CompressedBytes,
TotalDuplicates: chunkData.TotalDuplicates,
var storeData StoreData
values = meta.Get(storeDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return Result{
Ingester: Ingester{
TotalChunksMatched: ingData.TotalChunksMatched,
TotalBatches: ingData.TotalBatches,
TotalLinesSent: ingData.TotalLinesSent,
HeadChunkBytes: chunkData.HeadChunkBytes,
HeadChunkLines: chunkData.HeadChunkLines,
DecompressedBytes: chunkData.DecompressedBytes,
DecompressedLines: chunkData.DecompressedLines,
CompressedBytes: chunkData.CompressedBytes,
TotalDuplicates: chunkData.TotalDuplicates,
},
Store: Store{
TotalChunksRef: storeData.TotalChunksRef,
TotalChunksDownloaded: storeData.TotalChunksDownloaded,
ChunksDownloadTime: storeData.ChunksDownloadTime.Seconds(),
},
}
}

@ -82,16 +82,16 @@ func TestCollectTrailer(t *testing.T) {
t.Fatal(err)
}
res := decodeTrailers(ctx)
require.Equal(t, int32(2), res.TotalReached)
require.Equal(t, int64(2), res.TotalChunksMatched)
require.Equal(t, int64(4), res.TotalBatches)
require.Equal(t, int64(6), res.TotalLinesSent)
require.Equal(t, int64(2), res.HeadChunkBytes)
require.Equal(t, int64(2), res.HeadChunkLines)
require.Equal(t, int64(2), res.DecompressedBytes)
require.Equal(t, int64(2), res.DecompressedLines)
require.Equal(t, int64(2), res.CompressedBytes)
require.Equal(t, int64(2), res.TotalDuplicates)
require.Equal(t, int32(2), res.Ingester.TotalReached)
require.Equal(t, int64(2), res.Ingester.TotalChunksMatched)
require.Equal(t, int64(4), res.Ingester.TotalBatches)
require.Equal(t, int64(6), res.Ingester.TotalLinesSent)
require.Equal(t, int64(2), res.Ingester.HeadChunkBytes)
require.Equal(t, int64(2), res.Ingester.HeadChunkLines)
require.Equal(t, int64(2), res.Ingester.DecompressedBytes)
require.Equal(t, int64(2), res.Ingester.DecompressedLines)
require.Equal(t, int64(2), res.Ingester.CompressedBytes)
require.Equal(t, int64(2), res.Ingester.TotalDuplicates)
}
type ingesterFn func(*logproto.QueryRequest, logproto.Querier_QueryServer) error

@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"sort"
"strings"
"time"
@ -20,6 +21,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
@ -31,6 +33,7 @@ import (
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/util/validation"
)
@ -197,6 +200,14 @@ func (t *Loki) initIngester() (err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort
// We want ingester to also query the store when using boltdb-shipper
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = t.cfg.Ingester.MaxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute)
}
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
if err != nil {
return
@ -243,7 +254,7 @@ func (t *Loki) initTableManager() error {
os.Exit(1)
}
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig.Config)
tableClient, err := loki_storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig)
if err != nil {
return err
}
@ -277,6 +288,20 @@ func (t *Loki) stopTableManager() error {
}
func (t *Loki) initStore() (err error) {
if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
// We do not want ingester to unnecessarily keep downloading files
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
case Querier:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
default:
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
}
}
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
return
}
@ -473,3 +498,16 @@ var modules = map[moduleName]module{
deps: []moduleName{Querier, Ingester, Distributor, TableManager},
},
}
// activeIndexType type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activeIndexType(cfg chunk.SchemaConfig) string {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return cfg.Configs[i].IndexType
}

@ -2,7 +2,10 @@ package loki
import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
@ -35,3 +38,30 @@ func TestUniqueDeps(t *testing.T) {
expected := []moduleName{Server, Overrides, Distributor, Ingester}
assert.Equal(t, expected, uniqueDeps(input))
}
func TestActiveIndexType(t *testing.T) {
var cfg chunk.SchemaConfig
// just one PeriodConfig in the past
cfg.Configs = []chunk.PeriodConfig{{
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "first",
}}
assert.Equal(t, "first", activeIndexType(cfg))
// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, "second", activeIndexType(cfg))
// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, "second", activeIndexType(cfg))
}

@ -5,30 +5,33 @@ import (
"flag"
"sort"
"github.com/cortexproject/cortex/pkg/chunk"
cortex_local "github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/util"
)
// Config is the loki storage configuration
type Config struct {
storage.Config `yaml:",inline"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
storage.Config `yaml:",inline"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper_config"`
}
// RegisterFlags adds the flags required to configure this flag set.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
cfg.BoltDBShipperConfig.RegisterFlags(f)
f.IntVar(&cfg.MaxChunkBatchSize, "max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.")
}
@ -46,6 +49,8 @@ type store struct {
// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) {
registerCustomIndexClients(cfg, schemaCfg)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits)
if err != nil {
return nil, err
@ -56,6 +61,16 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
}, nil
}
// NewTableClient creates a TableClient for managing tables for index/chunk store.
// ToDo: Add support in Cortex for registering custom table client like index client.
func NewTableClient(name string, cfg Config) (chunk.TableClient, error) {
if name == local.BoltDBShipperType {
name = "boltdb"
cfg.FSConfig = cortex_local.FSConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}
}
return storage.NewTableClient(name, cfg.Config)
}
// decodeReq sanitizes an incoming request, rounds bounds, and appends the __name__ matcher
func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, model.Time, model.Time, error) {
expr, err := req.LogSelector()
@ -201,3 +216,36 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
}
return filtered
}
func registerCustomIndexClients(cfg Config, schemaCfg chunk.SchemaConfig) {
boltdbShipperInstances := 0
storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) {
// since we do not know which object client is being used for the period for which we are creating this index client,
// we need to iterate through all the periodic configs to find the right one.
// We maintain number of instances that we have already created in boltdbShipperInstances and then count the number of
// encounters of BoltDBShipperType until we find the right periodic config for getting the ObjectType.
// This is done assuming we are creating index client in the order of periodic configs.
// Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error.
// ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client.
boltdbShipperEncounter := 0
objectStoreType := ""
for _, config := range schemaCfg.Configs {
if config.IndexType == local.BoltDBShipperType {
boltdbShipperEncounter++
if boltdbShipperEncounter > boltdbShipperInstances {
objectStoreType = config.ObjectType
break
}
}
}
boltdbShipperInstances++
objectClient, err := storage.NewObjectClient(objectStoreType, cfg.Config)
if err != nil {
return nil, err
}
return local.NewBoltDBIndexClient(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
}, nil)
}

@ -0,0 +1,61 @@
package local
import (
"context"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"go.etcd.io/bbolt"
)
type BoltdbIndexClientWithShipper struct {
*local.BoltIndexClient
shipper *Shipper
}
// NewBoltDBIndexClient creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClient(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg)
if err != nil {
return nil, err
}
shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient)
if err != nil {
return nil, err
}
indexClient := BoltdbIndexClientWithShipper{
BoltIndexClient: boltDBIndexClient,
shipper: shipper,
}
return &indexClient, nil
}
func (b *BoltdbIndexClientWithShipper) Stop() {
b.shipper.Stop()
b.BoltIndexClient.Stop()
}
func (b *BoltdbIndexClientWithShipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
return chunk_util.DoParallelQueries(ctx, b.query, queries, callback)
}
func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error {
db, err := b.GetDB(query.TableName, local.DBOperationRead)
if err != nil && err != local.ErrUnexistentBoltDB {
return err
}
if db != nil {
if err := b.QueryDB(ctx, db, query, callback); err != nil {
return err
}
}
return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error {
return b.QueryDB(ctx, db, query, callback)
})
}

@ -0,0 +1,216 @@
package local
import (
"context"
"fmt"
"io"
"os"
"path"
"strings"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)
// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc *filesCollection) (toDownload []chunk.StorageObject, toDelete []string, err error) {
if s.cfg.Mode == ShipperModeWriteOnly {
return
}
// listing tables from store
var objects []chunk.StorageObject
objects, err = s.storageClient.List(ctx, period+"/")
if err != nil {
return
}
listedUploaders := make(map[string]struct{}, len(objects))
for _, object := range objects {
uploader := strings.Split(object.Key, "/")[1]
// don't include the file which was uploaded by same ingester
if uploader == s.uploader {
continue
}
listedUploaders[uploader] = struct{}{}
// Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates
downloadedFileDetails, ok := fc.files[uploader]
if !ok || downloadedFileDetails.mtime != object.ModifiedAt {
toDownload = append(toDownload, object)
}
}
for uploader := range fc.files {
if _, isOK := listedUploaders[uploader]; !isOK {
toDelete = append(toDelete, uploader)
}
}
return
}
// syncFilesForPeriod downloads updated and new files from for given period from all the uploaders and removes deleted ones
func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error {
level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", period))
fc.RLock()
toDownload, toDelete, err := s.checkStorageForUpdates(ctx, period, fc)
fc.RUnlock()
if err != nil {
return err
}
for _, storageObject := range toDownload {
err = s.downloadFile(ctx, period, storageObject, fc)
if err != nil {
return err
}
}
for _, uploader := range toDelete {
err := s.deleteFileFromCache(period, uploader, fc)
if err != nil {
return err
}
}
return nil
}
// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it.
func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject chunk.StorageObject, fc *filesCollection) error {
uploader := strings.Split(storageObject.Key, "/")[1]
folderPath, _ := s.getFolderPathForPeriod(period, false)
filePath := path.Join(folderPath, uploader)
// download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists
tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp"))
err := s.getFileFromStorage(ctx, storageObject.Key, tempFilePath)
if err != nil {
return err
}
fc.Lock()
defer fc.Unlock()
df, ok := fc.files[uploader]
if ok {
if err := df.boltdb.Close(); err != nil {
return err
}
} else {
df = downloadedFiles{}
}
// move the file from temp location to actual location
err = os.Rename(tempFilePath, filePath)
if err != nil {
return err
}
df.mtime = storageObject.ModifiedAt
df.boltdb, err = local.OpenBoltdbFile(filePath)
if err != nil {
return err
}
fc.files[uploader] = df
return nil
}
// getFileFromStorage downloads a file from storage to given location.
func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination string) error {
readCloser, err := s.storageClient.GetObject(ctx, objectKey)
if err != nil {
return err
}
defer func() {
if err := readCloser.Close(); err != nil {
level.Error(util.Logger)
}
}()
f, err := os.Create(destination)
if err != nil {
return err
}
_, err = io.Copy(f, readCloser)
if err != nil {
return err
}
level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey))
return f.Sync()
}
// downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL
// While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock
func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error {
fc.Lock()
defer fc.Unlock()
objects, err := s.storageClient.List(ctx, period+"/")
if err != nil {
return err
}
level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects))
folderPath, err := s.getFolderPathForPeriod(period, true)
if err != nil {
return err
}
for _, object := range objects {
uploader := getUploaderFromObjectKey(object.Key)
if uploader == s.uploader {
continue
}
filePath := path.Join(folderPath, uploader)
df := downloadedFiles{}
err := s.getFileFromStorage(ctx, object.Key, filePath)
if err != nil {
return err
}
df.mtime = object.ModifiedAt
df.boltdb, err = local.OpenBoltdbFile(filePath)
if err != nil {
return err
}
fc.files[uploader] = df
}
return nil
}
func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) {
folderPath := path.Join(s.cfg.CacheLocation, period)
if ensureExists {
err := chunk_util.EnsureDirectory(folderPath)
if err != nil {
return "", err
}
}
return folderPath, nil
}
func getUploaderFromObjectKey(objectKey string) string {
return strings.Split(objectKey, "/")[1]
}

@ -0,0 +1,121 @@
package local
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/stretchr/testify/require"
)
func queryTestBoltdb(t *testing.T, boltdbIndexClient *BoltdbIndexClientWithShipper, query chunk.IndexQuery) map[string]string {
resp := map[string]string{}
require.NoError(t, boltdbIndexClient.query(context.Background(), query, func(batch chunk.ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
for itr.Next() {
resp[string(itr.RangeValue())] = string(itr.Value())
}
return true
}))
return resp
}
func writeTestData(t *testing.T, indexClient *BoltdbIndexClientWithShipper, tableName string, numRecords, startValue int) {
time.Sleep(time.Second / 2)
batch := indexClient.NewWriteBatch()
for i := 0; i < numRecords; i++ {
value := []byte(strconv.Itoa(startValue + i))
batch.Add(tableName, "", value, value)
}
require.NoError(t, indexClient.BatchWrite(context.Background(), batch))
boltdb, err := indexClient.GetDB(tableName, local.DBOperationWrite)
require.NoError(t, err)
require.NoError(t, boltdb.Sync())
}
func TestShipper_Downloads(t *testing.T) {
tempDirForTests, err := ioutil.TempDir("", "test-dir")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tempDirForTests))
}()
localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store")
require.NoError(t, err)
boltDBWithShipper1 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester1", localStoreLocation)
boltDBWithShipper2 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester2", localStoreLocation)
// add a file to boltDBWithShipper1
writeTestData(t, boltDBWithShipper1, "1", 10, 0)
// upload files from boltDBWithShipper1
require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background()))
// query data for same table from boltDBWithShipper2
resp := queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{
TableName: "1",
})
// make sure we got same data that was added from boltDBWithShipper1
checkExpectedKVsInBoltdbResp(t, resp, 10, 0)
// add more data to the previous file added to boltDBWithShipper1 and the upload it
writeTestData(t, boltDBWithShipper1, "1", 10, 10)
require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background()))
// sync files in boltDBWithShipper2
require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background()))
// query data for same table from boltDBWithShipper2
resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{
TableName: "1",
})
// make sure we also got new data that was added from boltDBWithShipper1
checkExpectedKVsInBoltdbResp(t, resp, 20, 0)
// add some data for same table in boltDBWithShipper2
writeTestData(t, boltDBWithShipper2, "1", 10, 20)
// query data for same table from boltDBWithShipper2
resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{
TableName: "1",
})
// make sure we data from boltDBWithShipper1 and boltDBWithShipper2
checkExpectedKVsInBoltdbResp(t, resp, 30, 0)
// stop boltDBWithShipper1
boltDBWithShipper1.Stop()
// delete the file from the store that was uploaded by boltDBWithShipper1
require.NoError(t, os.Remove(filepath.Join(localStoreLocation, storageKeyPrefix, "1", boltDBWithShipper1.shipper.uploader)))
// sync files in boltDBWithShipper2
require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background()))
// query data for same table from boltDBWithShipper2
resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{
TableName: "1",
})
// make sure we got only data that was added to boltDBWithShipper2
checkExpectedKVsInBoltdbResp(t, resp, 10, 20)
boltDBWithShipper2.Stop()
}

@ -0,0 +1,294 @@
package local
import (
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"path"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
pkg_util "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage/stores/util"
)
const (
// ShipperModeReadWrite is to allow both read and write
ShipperModeReadWrite = iota
// ShipperModeReadOnly is to allow only read operations
ShipperModeReadOnly
// ShipperModeWriteOnly is to allow only write operations
ShipperModeWriteOnly
// ShipperFileUploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters.
ShipperFileUploadInterval = 15 * time.Minute
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
cacheCleanupInterval = 24 * time.Hour
storageKeyPrefix = "index/"
)
type BoltDBGetter interface {
GetDB(name string, operation int) (*bbolt.DB, error)
}
type ShipperConfig struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
IngesterName string `yaml:"-"`
Mode int `yaml:"-"`
}
// RegisterFlags registers flags.
func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
}
type downloadedFiles struct {
mtime time.Time
boltdb *bbolt.DB
}
// filesCollection holds info about shipped boltdb index files by other uploaders(ingesters).
// It is generally used to hold boltdb files created by all the ingesters for same period i.e with same name.
// In the object store files are uploaded as <boltdb-filename>/<uploader-id> to manage files with same name from different ingesters
type filesCollection struct {
sync.RWMutex
lastUsedAt time.Time
files map[string]downloadedFiles
}
type Shipper struct {
cfg ShipperConfig
boltDBGetter BoltDBGetter
// downloadedPeriods holds mapping for period -> filesCollection.
// Here period is name of the file created by ingesters for a specific period.
downloadedPeriods map[string]*filesCollection
downloadedPeriodsMtx sync.RWMutex
storageClient chunk.ObjectClient
uploader string
uploadedFilesMtime map[string]time.Time
uploadedFilesMtimeMtx sync.RWMutex
done chan struct{}
wait sync.WaitGroup
}
// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) {
err := chunk_util.EnsureDirectory(cfg.CacheLocation)
if err != nil {
return nil, err
}
shipper := Shipper{
cfg: cfg,
boltDBGetter: boltDBGetter,
downloadedPeriods: map[string]*filesCollection{},
storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix),
done: make(chan struct{}),
uploadedFilesMtime: map[string]time.Time{},
}
shipper.uploader, err = shipper.getUploaderName()
if err != nil {
return nil, err
}
shipper.wait.Add(1)
go shipper.loop()
return &shipper, nil
}
// we would persist uploader name in <active-index-directory>/uploader/name file so that we use same name on subsequent restarts to
// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to
// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters.
func (s *Shipper) getUploaderName() (string, error) {
uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().Unix())
uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name")
if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil {
return "", err
}
_, err := os.Stat(uploaderFilePath)
if err != nil {
if !os.IsNotExist(err) {
return "", err
}
if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil {
return "", err
}
} else {
ub, err := ioutil.ReadFile(uploaderFilePath)
if err != nil {
return "", err
}
uploader = string(ub)
}
return uploader, nil
}
func (s *Shipper) loop() {
defer s.wait.Done()
resyncTicker := time.NewTicker(s.cfg.ResyncInterval)
defer resyncTicker.Stop()
uploadFilesTicker := time.NewTicker(ShipperFileUploadInterval)
defer uploadFilesTicker.Stop()
cacheCleanupTicker := time.NewTicker(cacheCleanupInterval)
defer cacheCleanupTicker.Stop()
for {
select {
case <-resyncTicker.C:
err := s.syncLocalWithStorage(context.Background())
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
}
case <-uploadFilesTicker.C:
err := s.uploadFiles(context.Background())
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err)
}
case <-cacheCleanupTicker.C:
err := s.cleanupCache()
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error cleaning up expired tables", "err", err)
}
case <-s.done:
return
}
}
}
// Stop the shipper and push all the local files to the store
func (s *Shipper) Stop() {
close(s.done)
s.wait.Wait()
// Push all boltdb files to storage before returning
err := s.uploadFiles(context.Background())
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err)
}
s.downloadedPeriodsMtx.Lock()
defer s.downloadedPeriodsMtx.Unlock()
for _, fc := range s.downloadedPeriods {
fc.Lock()
for _, fl := range fc.files {
_ = fl.boltdb.Close()
}
fc.Unlock()
}
}
// cleanupCache removes all the files for a period which has not be queried for using the configured TTL
func (s *Shipper) cleanupCache() error {
s.downloadedPeriodsMtx.Lock()
defer s.downloadedPeriodsMtx.Unlock()
for period, fc := range s.downloadedPeriods {
if fc.lastUsedAt.Add(s.cfg.CacheTTL).Before(time.Now()) {
for uploader := range fc.files {
if err := s.deleteFileFromCache(period, uploader, fc); err != nil {
return err
}
}
delete(s.downloadedPeriods, period)
}
}
return nil
}
// syncLocalWithStorage syncs all the periods that we have in the cache with the storage
// i.e download new and updated files and remove files which were delete from the storage.
func (s *Shipper) syncLocalWithStorage(ctx context.Context) error {
s.downloadedPeriodsMtx.RLock()
defer s.downloadedPeriodsMtx.RUnlock()
for period := range s.downloadedPeriods {
if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil {
return err
}
}
return nil
}
// deleteFileFromCache removes a file from cache.
// It takes care of locking the filesCollection, closing the boltdb file and removing the file from cache
func (s *Shipper) deleteFileFromCache(period, uploader string, fc *filesCollection) error {
fc.Lock()
defer fc.Unlock()
if err := fc.files[uploader].boltdb.Close(); err != nil {
return err
}
delete(fc.files, uploader)
return os.Remove(path.Join(s.cfg.CacheLocation, period, uploader))
}
func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *bbolt.DB) error) error {
s.downloadedPeriodsMtx.RLock()
fc, ok := s.downloadedPeriods[period]
s.downloadedPeriodsMtx.RUnlock()
if !ok {
s.downloadedPeriodsMtx.Lock()
fc, ok = s.downloadedPeriods[period]
if ok {
s.downloadedPeriodsMtx.Unlock()
} else {
level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("downloading all files for period %s", period))
fc = &filesCollection{files: map[string]downloadedFiles{}}
s.downloadedPeriods[period] = fc
s.downloadedPeriodsMtx.Unlock()
if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil {
return err
}
}
}
fc.RLock()
defer fc.RUnlock()
fc.lastUsedAt = time.Now()
for uploader := range fc.files {
if err := callback(fc.files[uploader].boltdb); err != nil {
return err
}
}
return nil
}

@ -0,0 +1,112 @@
package local
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"
)
// uploadFiles uploads all new and updated files to storage.
// It uploads the files from configured boltdb dir where ingester writes the index.
func (s *Shipper) uploadFiles(ctx context.Context) error {
if s.cfg.Mode == ShipperModeReadOnly {
return nil
}
filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory)
if err != nil {
return err
}
for _, fileInfo := range filesInfo {
if fileInfo.IsDir() {
continue
}
s.uploadedFilesMtimeMtx.RLock()
// Checking whether file is updated after last push, if not skipping it
uploadedFileMtime, ok := s.uploadedFilesMtime[fileInfo.Name()]
s.uploadedFilesMtimeMtx.RUnlock()
if ok && uploadedFileMtime.Equal(fileInfo.ModTime()) {
continue
}
err := s.uploadFile(ctx, fileInfo.Name())
if err != nil {
return err
}
s.uploadedFilesMtimeMtx.Lock()
s.uploadedFilesMtime[fileInfo.Name()] = fileInfo.ModTime()
s.uploadedFilesMtimeMtx.Unlock()
}
return nil
}
// uploadFile uploads one of the files locally written by ingesters to storage.
func (s *Shipper) uploadFile(ctx context.Context, period string) error {
if s.cfg.Mode == ShipperModeReadOnly {
return nil
}
level.Debug(util.Logger).Log("msg", fmt.Sprintf("uploading file for period %s", period))
snapshotPath := path.Join(s.cfg.CacheLocation, period)
err := chunk_util.EnsureDirectory(snapshotPath)
if err != nil {
return err
}
filePath := path.Join(snapshotPath, fmt.Sprintf("%s.%s", s.uploader, "temp"))
f, err := os.Create(filePath)
if err != nil {
return err
}
defer func() {
if err := os.Remove(filePath); err != nil {
level.Error(util.Logger)
}
}()
db, err := s.boltDBGetter.GetDB(period, local.DBOperationRead)
if err != nil {
return err
}
err = db.View(func(tx *bbolt.Tx) error {
_, err := tx.WriteTo(f)
return err
})
if err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}
if _, err := f.Seek(0, 0); err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
level.Error(util.Logger)
}
}()
// Files are stored with <filename>/<uploader>
objectKey := fmt.Sprintf("%s/%s", period, s.uploader)
return s.storageClient.PutObject(ctx, objectKey, f)
}

@ -0,0 +1,165 @@
package local
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
const testBucketName = "testBucket"
func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, localStoreLocation string) *BoltdbIndexClientWithShipper {
cacheLocation := filepath.Join(parentTempDir, ingesterName, "cache")
boltdbFilesLocation := filepath.Join(parentTempDir, ingesterName, "boltdb")
require.NoError(t, util.EnsureDirectory(cacheLocation))
require.NoError(t, util.EnsureDirectory(boltdbFilesLocation))
shipperConfig := ShipperConfig{
ActiveIndexDirectory: boltdbFilesLocation,
CacheLocation: cacheLocation,
CacheTTL: 1 * time.Hour,
ResyncInterval: 1 * time.Hour,
IngesterName: ingesterName,
Mode: ShipperModeReadWrite,
}
archiveStoreClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: localStoreLocation,
})
require.NoError(t, err)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClient(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
require.NoError(t, err)
return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)
}
func addTestRecordsToBoltDBFile(t *testing.T, boltdb *bbolt.DB, numRecords int, start int) {
time.Sleep(time.Second / 2)
err := boltdb.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(testBucketName))
if err != nil {
return err
}
for i := 0; i < numRecords; i++ {
kv := []byte(strconv.Itoa(start + i))
err = b.Put(kv, kv)
if err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
require.NoError(t, boltdb.Sync())
}
func readAllKVsFromBoltdbFile(t *testing.T, boltdb *bbolt.DB) map[string]string {
resp := map[string]string{}
err := boltdb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(testBucketName))
require.NotNil(t, b)
return b.ForEach(func(k, v []byte) error {
resp[string(k)] = string(v)
return nil
})
})
require.NoError(t, err)
return resp
}
func readAllKVsFromBoltdbFileAtPath(t *testing.T, path string) map[string]string {
boltDBFile, err := local.OpenBoltdbFile(path)
require.NoError(t, err)
defer func() {
require.NoError(t, boltDBFile.Close())
}()
return readAllKVsFromBoltdbFile(t, boltDBFile)
}
func checkExpectedKVsInBoltdbResp(t *testing.T, resp map[string]string, expectedNumRecords, start int) {
require.Equal(t, expectedNumRecords, len(resp), "responses", resp)
for i := 0; i < expectedNumRecords; i++ {
expectedKV := strconv.Itoa(start + i)
val, ok := resp[expectedKV]
require.Equal(t, true, ok)
require.Equal(t, expectedKV, val)
}
}
func TestShipper_Uploads(t *testing.T) {
tempDirForTests, err := ioutil.TempDir("", "test-dir")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tempDirForTests))
}()
localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store")
require.NoError(t, err)
boltDBWithShipper := createTestBoltDBWithShipper(t, tempDirForTests, "ingester", localStoreLocation)
// create a boltdb file for boltDBWithShipper to test upload.
boltdbFile1, err := boltDBWithShipper.GetDB("file1", local.DBOperationWrite)
require.NoError(t, err)
file1PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile1.Path()), boltDBWithShipper.shipper.uploader)
// add some test records to boltdbFile1
addTestRecordsToBoltDBFile(t, boltdbFile1, 10, 1)
// Upload files from boltDBWithShipper
err = boltDBWithShipper.shipper.uploadFiles(context.Background())
require.NoError(t, err)
// open boltdbFile1 and verify it has expected records
checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 10, 1)
// create another boltdb file for boltDBWithShipper to test upload.
boltdbFile2, err := boltDBWithShipper.GetDB("file2", local.DBOperationWrite)
require.NoError(t, err)
file2PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile2.Path()), boltDBWithShipper.shipper.uploader)
// add some test records to boltdbFile2 and some more records to boltdbFile1
addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 1)
addTestRecordsToBoltDBFile(t, boltdbFile1, 5, 11)
// Upload files from boltDBWithShipper
err = boltDBWithShipper.shipper.uploadFiles(context.Background())
require.NoError(t, err)
// open boltdbFile1 and boltdbFile2 and verify it has expected records
checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 10, 1)
checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 15, 1)
// modify boltdbFile2 again
addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 11)
// stop boltDBWithShipper to make it upload all the new and changed to store
boltDBWithShipper.Stop()
checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 20, 1)
}

@ -0,0 +1,47 @@
package util
import (
"context"
"io"
"strings"
"github.com/cortexproject/cortex/pkg/chunk"
)
type PrefixedObjectClient struct {
downstreamClient chunk.ObjectClient
prefix string
}
func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object)
}
func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}
func (p PrefixedObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) {
objects, err := p.downstreamClient.List(ctx, p.prefix+prefix)
if err != nil {
return nil, err
}
for i := range objects {
objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix)
}
return objects, nil
}
func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey)
}
func (p PrefixedObjectClient) Stop() {
p.downstreamClient.Stop()
}
func NewPrefixedObjectClient(downstreamClient chunk.ObjectClient, prefix string) chunk.ObjectClient {
return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix}
}
Loading…
Cancel
Save