make boltdb shipper singleton and some other minor refactoring (#1995)

* make boltdb shipper singleton and some other minor refactoring
pull/1977/head
Sandeep Sukhani 5 years ago committed by GitHub
parent 47b79f3ea5
commit 21d4ca41e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docs/operations/storage/boltdb-shipper.md
  2. 3
      pkg/logcli/query/query.go
  3. 2
      pkg/loki/modules.go
  4. 2
      pkg/storage/hack/main.go
  5. 42
      pkg/storage/store.go
  6. 143
      pkg/storage/store_test.go
  7. 4
      pkg/storage/stores/local/boltdb_index_client.go
  8. 2
      pkg/storage/stores/local/shipper.go
  9. 2
      pkg/storage/stores/local/uploads_test.go

@ -26,8 +26,9 @@ storage_config:
gcs:
bucket_name: GCS_BUCKET_NAME
boltdb_shipper_config:
boltdb_shipper:
active_index_directory: /loki/index
shared_store: gcs
cache_location: /loki/boltdb-cache
```

@ -12,6 +12,7 @@ import (
"github.com/fatih/color"
json "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"
@ -148,7 +149,7 @@ func localStore(conf loki.Config) (logql.Querier, error) {
if err != nil {
return nil, err
}
s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits)
s, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

@ -266,7 +266,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
}

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
@ -64,6 +65,7 @@ func getStore() (lstore.Store, error) {
},
},
&validation.Overrides{},
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err

@ -26,7 +26,7 @@ import (
type Config struct {
storage.Config `yaml:",inline"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper_config"`
BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper"`
}
// RegisterFlags adds the flags required to configure this flag set.
@ -49,10 +49,10 @@ type store struct {
}
// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) {
registerCustomIndexClients(cfg, schemaCfg)
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
registerCustomIndexClients(cfg)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, prometheus.DefaultRegisterer)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer)
if err != nil {
return nil, err
}
@ -218,35 +218,23 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}
func registerCustomIndexClients(cfg Config, schemaCfg chunk.SchemaConfig) {
boltdbShipperInstances := 0
func registerCustomIndexClients(cfg Config) {
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
var boltDBIndexClientWithShipper chunk.IndexClient
storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) {
// since we do not know which object client is being used for the period for which we are creating this index client,
// we need to iterate through all the periodic configs to find the right one.
// We maintain number of instances that we have already created in boltdbShipperInstances and then count the number of
// encounters of BoltDBShipperType until we find the right periodic config for getting the ObjectType.
// This is done assuming we are creating index client in the order of periodic configs.
// Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error.
// ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client.
boltdbShipperEncounter := 0
objectStoreType := ""
for _, config := range schemaCfg.Configs {
if config.IndexType == local.BoltDBShipperType {
boltdbShipperEncounter++
if boltdbShipperEncounter > boltdbShipperInstances {
objectStoreType = config.ObjectType
break
}
}
if boltDBIndexClientWithShipper != nil {
return boltDBIndexClientWithShipper, nil
}
boltdbShipperInstances++
objectClient, err := storage.NewObjectClient(objectStoreType, cfg.Config)
objectClient, err := storage.NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg.Config)
if err != nil {
return nil, err
}
return local.NewBoltDBIndexClient(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig)
return boltDBIndexClientWithShipper, err
}, nil)
}

@ -2,9 +2,12 @@ package storage
import (
"context"
"io/ioutil"
"log"
"net/http"
_ "net/http/pprof"
"os"
"path"
"runtime"
"testing"
"time"
@ -14,13 +17,15 @@ import (
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
cortex_local "github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/storage/stores/local"
"github.com/grafana/loki/pkg/util/validation"
)
@ -149,8 +154,8 @@ func getLocalStore() Store {
}
store, err := NewStore(Config{
Config: storage.Config{
BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"},
BoltDBConfig: cortex_local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: cortex_local.FSConfig{Directory: "/tmp/benchmark/chunks"},
},
MaxChunkBatchSize: 10,
}, chunk.StoreConfig{}, chunk.SchemaConfig{
@ -166,7 +171,7 @@ func getLocalStore() Store {
},
},
},
}, limits)
}, limits, nil)
if err != nil {
panic(err)
}
@ -422,6 +427,108 @@ func Test_store_GetSeries(t *testing.T) {
}
}
type timeRange struct {
from, to time.Time
}
func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
tempDir, err := ioutil.TempDir("", "multiple-boltdb-shippers")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()
limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
// config for BoltDB Shipper
boltdbShipperConfig := local.ShipperConfig{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = "filesystem"
boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")
// dates for activation of boltdb shippers
firstStoreDate := parseDate("2019-01-01")
secondStoreDate := parseDate("2019-01-02")
store, err := NewStore(Config{
Config: storage.Config{
FSConfig: cortex_local.FSConfig{Directory: path.Join(tempDir, "chunks")},
},
BoltDBShipperConfig: boltdbShipperConfig,
}, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: timeToModelTime(firstStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
{
From: chunk.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
Schema: "v11",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
RowShards: 2,
},
},
}, limits, nil)
require.NoError(t, err)
// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
// chunk just for first store
secondStoreDate.Add(-3 * time.Hour),
secondStoreDate.Add(-2 * time.Hour),
},
{
// chunk overlapping both the stores
secondStoreDate.Add(-time.Hour),
secondStoreDate.Add(time.Hour),
},
{
// chunk just for second store
secondStoreDate.Add(2 * time.Hour),
secondStoreDate.Add(3 * time.Hour),
},
}
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
addedChunkIDs[chk.ExternalKey()] = struct{}{}
}
// get all the chunks from both the stores
chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...)
require.NoError(t, err)
// we get common chunk twice because it is indexed in both the stores
require.Len(t, chunks, len(addedChunkIDs)+1)
// check whether we got back all the chunks which were added
for i := range chunks {
_, ok := addedChunkIDs[chunks[i].ExternalKey()]
require.True(t, ok)
}
}
func mustParseLabels(s string) map[string]string {
l, err := marshal.NewLabelSet(s)
@ -431,3 +538,31 @@ func mustParseLabels(s string) map[string]string {
return l
}
func parseDate(in string) time.Time {
t, err := time.Parse("2006-01-02", in)
if err != nil {
panic(err)
}
return t
}
func buildTestStreams(labels string, tr timeRange) logproto.Stream {
stream := logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{},
}
for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: from,
Line: from.String(),
})
}
return stream
}
func timeToModelTime(t time.Time) model.Time {
return model.TimeFromUnixNano(t.UnixNano())
}

@ -14,8 +14,8 @@ type BoltdbIndexClientWithShipper struct {
shipper *Shipper
}
// NewBoltDBIndexClient creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClient(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB.
func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) {
boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg)
if err != nil {
return nil, err

@ -43,6 +43,7 @@ type BoltDBGetter interface {
type ShipperConfig struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
@ -53,6 +54,7 @@ type ShipperConfig struct {
// RegisterFlags registers flags.
func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")

@ -38,7 +38,7 @@ func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, loca
})
require.NoError(t, err)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClient(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig)
require.NoError(t, err)
return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper)

Loading…
Cancel
Save