disable index dedupe when rf > 1 and current or upcoming index type is boltdb-shipper (#2206)

pull/2425/head
Sandeep Sukhani 5 years ago committed by GitHub
parent 204e61afe3
commit 807193d373
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      docs/sources/operations/storage/boltdb-shipper.md
  2. 31
      pkg/loki/modules.go
  3. 27
      pkg/loki/modules_test.go

@ -86,5 +86,14 @@ Frequency for checking updates can be configured with `resync_interval` config.
To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location.
ttl can be configured using `cache_ttl` config.
### Write Deduplication disabled
Loki does write deduplication of chunks and index using Chunks and WriteDedupe cache respectively, configured with [ChunkStoreConfig](../../configuration/README.md#chunk_store_config).
The problem with write deduplication when using `boltdb-shipper` though is ingesters only keep uploading boltdb files periodically to make them available to all the other services which means there would be a brief period where some of the services would not have received updated index yet.
The problem due to that is if an ingester which first wrote the chunks and index goes down and all the other ingesters which were part of replication scheme skipped writing those chunks and index due to deduplication, we would end up missing those logs from query responses since only the ingester which had the index went down.
This problem would be faced even during rollouts which is quite common.
To avoid this, Loki disables deduplication of index when the replication factor is greater than 1 and `boltdb-shipper` is an active or upcoming index type.
While using `boltdb-shipper` please avoid configuring WriteDedupe cache since it is used purely for the index deduplication, so it would not be used anyways.

@ -11,6 +11,7 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
@ -180,7 +181,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
// We want ingester to also query the store when using boltdb-shipper
pc := activePeriodConfig(t.cfg.SchemaConfig)
pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)]
if pc.IndexType == local.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
@ -242,7 +243,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}
func (t *Loki) initStore() (_ services.Service, err error) {
if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == local.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
@ -256,6 +257,13 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}
// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && usingBoltdbShipper(t.cfg.SchemaConfig) {
t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
@ -346,9 +354,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
return t.memberlistKV, nil
}
// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now
// Note: Another periodic config can be applicable in future which can change index type
func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
// activePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func activePeriodConfig(cfg chunk.SchemaConfig) int {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
@ -356,7 +364,18 @@ func activePeriodConfig(cfg chunk.SchemaConfig) chunk.PeriodConfig {
if i > 0 {
i--
}
return cfg.Configs[i]
return i
}
// usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes.
func usingBoltdbShipper(cfg chunk.SchemaConfig) bool {
activePCIndex := activePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == local.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == local.BoltDBShipperType) {
return true
}
return false
}
func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {

@ -18,22 +18,43 @@ func TestActiveIndexType(t *testing.T) {
IndexType: "first",
}}
assert.Equal(t, cfg.Configs[0], activePeriodConfig(cfg))
assert.Equal(t, 0, activePeriodConfig(cfg))
// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))
assert.Equal(t, 1, activePeriodConfig(cfg))
// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, cfg.Configs[1], activePeriodConfig(cfg))
assert.Equal(t, 1, activePeriodConfig(cfg))
}
func TestUsingBoltdbShipper(t *testing.T) {
var cfg chunk.SchemaConfig
// just one PeriodConfig in the past using boltdb-shipper
cfg.Configs = []chunk.PeriodConfig{{
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
}}
assert.Equal(t, true, usingBoltdbShipper(cfg))
// just one PeriodConfig in the past not using boltdb-shipper
cfg.Configs[0].IndexType = "boltdb"
assert.Equal(t, false, usingBoltdbShipper(cfg))
// add a newer PeriodConfig in the future using boltdb-shipper
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "boltdb-shipper",
})
assert.Equal(t, true, usingBoltdbShipper(cfg))
}
func Test_calculateMaxLookBack(t *testing.T) {

Loading…
Cancel
Save