ingester.index-shards config (#4111)

* consistent shard mapping test & better error msg

* ingester.index-shards config

* loki fails to start if the ingester & schema factors arent compatible.

* lint

* docs

* update changelog
pull/4122/head
Owen Diehl 4 years ago committed by GitHub
parent 45693b7941
commit 92a4e57448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      docs/sources/configuration/_index.md
  3. 9
      pkg/ingester/index/index.go
  4. 45
      pkg/ingester/index/index_test.go
  5. 9
      pkg/ingester/ingester.go
  6. 13
      pkg/ingester/ingester_test.go
  7. 2
      pkg/ingester/instance.go
  8. 1
      pkg/ingester/instance_test.go
  9. 90
      pkg/loki/config_test.go
  10. 11
      pkg/loki/loki.go

@ -1,5 +1,6 @@
## Main
* [4111](https://github.com/grafana/loki/pull/4111) **owen-d**: Introduce `ingester.index-shards`
* [3627](https://github.com/grafana/loki/pull/3627) **MichelHollands**: Update vendored Cortex to 2d8477c4a325
* [3532](https://github.com/grafana/loki/pull/3532) **MichelHollands**: Update vendored Cortex to 8a2e2c1eeb65
* [3446](https://github.com/grafana/loki/pull/3446) **pracucci, owen-d**: Remove deprecated config `querier.split-queries-by-day` in favor of `querier.split-queries-by-interval`

@ -982,6 +982,10 @@ wal:
# Maximum memory size the WAL may use during replay. After hitting this it will flush data to storage before continuing.
# A unit suffix (KB, MB, GB) may be applied.
[replay_memory_ceiling: <string> | default = 4GB]
# Shard factor used in the ingesters for the in process reverse index.
# This MUST be evenly divisible by ALL schema shard factors or Loki will not start.
[index_shards: <int> | default = 32]
```
## consul_config

@ -23,7 +23,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
)
const indexShards = 32
const DefaultIndexShards = 32
var ErrInvalidShardQuery = errors.New("incompatible index shard query")
@ -34,11 +34,6 @@ type InvertedIndex struct {
shards []*indexShard
}
// New returns a new InvertedIndex.
func New() *InvertedIndex {
return NewWithShards(indexShards)
}
func NewWithShards(totalShards uint32) *InvertedIndex {
shards := make([]*indexShard, totalShards)
for i := uint32(0); i < totalShards; i++ {
@ -74,7 +69,7 @@ func validateShard(totalShards uint32, shard *astmapper.ShardAnnotation) error {
return nil
}
if int(totalShards)%shard.Of != 0 || uint32(shard.Of) > totalShards {
return fmt.Errorf("%w index_shard:%d query_shard:%d_%d", ErrInvalidShardQuery, totalShards, shard.Of, shard.Shard)
return fmt.Errorf("%w index_shard:%d query_shard:%v", ErrInvalidShardQuery, totalShards, shard)
}
return nil
}

@ -71,7 +71,7 @@ func BenchmarkHash(b *testing.B) {
}
func TestDeleteAddLoopkup(t *testing.T) {
index := New()
index := NewWithShards(DefaultIndexShards)
lbs := []cortexpb.LabelAdapter{
{Name: "foo", Value: "foo"},
{Name: "bar", Value: "bar"},
@ -113,3 +113,46 @@ func Test_hash_mapping(t *testing.T) {
})
}
}
func Test_ConsistentMapping(t *testing.T) {
a := NewWithShards(16)
b := NewWithShards(32)
for i := 0; i < 100; i++ {
lbs := labels.Labels{
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "hi", Value: fmt.Sprint(i)},
}
a.Add(cortexpb.FromLabelsToLabelAdapters(lbs), model.Fingerprint(i))
b.Add(cortexpb.FromLabelsToLabelAdapters(lbs), model.Fingerprint(i))
}
shardMax := 8
for i := 0; i < shardMax; i++ {
shard := &astmapper.ShardAnnotation{
Shard: i,
Of: shardMax,
}
aIDs, err := a.Lookup([]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}, shard)
require.Nil(t, err)
bIDs, err := b.Lookup([]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}, shard)
require.Nil(t, err)
sorter := func(xs []model.Fingerprint) {
sort.Slice(xs, func(i, j int) bool {
return xs[i] < xs[j]
})
}
sorter(aIDs)
sorter(bIDs)
require.Equal(t, aIDs, bIDs, "incorrect shard mapping for shard %v", shard)
}
}

@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
@ -82,6 +83,8 @@ type Config struct {
ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
UnorderedWrites bool `yaml:"unordered_writes_enabled"`
IndexShards int `yaml:"index_shards"`
}
// RegisterFlags registers the flags.
@ -105,6 +108,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
}
func (cfg *Config) Validate() error {
@ -121,6 +125,11 @@ func (cfg *Config) Validate() error {
if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled {
return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers")
}
if cfg.IndexShards <= 0 {
return fmt.Errorf("Invalid ingester index shard factor: %d", cfg.IndexShards)
}
return nil
}

@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
@ -462,28 +463,40 @@ func TestValidate(t *testing.T) {
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
IndexShards: index.DefaultIndexShards,
},
expected: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
parsedEncoding: chunkenc.EncGZIP,
IndexShards: index.DefaultIndexShards,
},
},
{
in: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
IndexShards: index.DefaultIndexShards,
},
expected: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
parsedEncoding: chunkenc.EncSnappy,
IndexShards: index.DefaultIndexShards,
},
},
{
in: Config{
IndexShards: index.DefaultIndexShards,
ChunkEncoding: "bad-enc",
},
err: true,
},
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
},
err: true,
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
err := tc.in.Validate()

@ -100,7 +100,7 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt
streams: map[string]*stream{},
streamsByFP: map[model.Fingerprint]*stream{},
buf: make([]byte, 0, 1024),
index: index.New(),
index: index.NewWithShards(uint32(cfg.IndexShards)),
instanceID: instanceID,
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),

@ -26,6 +26,7 @@ func defaultConfig() *Config {
cfg := Config{
BlockSize: 512,
ChunkEncoding: "gzip",
IndexShards: 32,
}
if err := cfg.Validate(); err != nil {
panic(errors.Wrap(err, "error building default test config"))

@ -0,0 +1,90 @@
package loki
import (
"flag"
"testing"
"time"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestCrossComponentValidation(t *testing.T) {
for _, tc := range []struct {
desc string
base *Config
err bool
}{
{
desc: "correct shards",
base: &Config{
Ingester: ingester.Config{
IndexShards: 32,
},
SchemaConfig: storage.SchemaConfig{
SchemaConfig: chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
// zero should not error
RowShards: 0,
Schema: "v6",
From: chunk.DayTime{
Time: model.Now().Add(-48 * time.Hour),
},
},
{
RowShards: 16,
Schema: "v11",
From: chunk.DayTime{
Time: model.Now(),
},
},
},
},
},
},
err: false,
},
{
desc: "correct shards",
base: &Config{
Ingester: ingester.Config{
IndexShards: 32,
},
SchemaConfig: storage.SchemaConfig{
SchemaConfig: chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
RowShards: 16,
Schema: "v11",
From: chunk.DayTime{
Time: model.Now().Add(-48 * time.Hour),
},
},
{
RowShards: 17,
Schema: "v11",
From: chunk.DayTime{
Time: model.Now(),
},
},
},
},
},
},
err: true,
},
} {
tc.base.RegisterFlags(flag.NewFlagSet(tc.desc, 0))
err := tc.base.Validate()
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
}
}

@ -153,6 +153,17 @@ func (c *Config) Validate() error {
if c.ChunkStoreConfig.MaxLookBackPeriod > 0 {
c.LimitsConfig.MaxQueryLookback = c.ChunkStoreConfig.MaxLookBackPeriod
}
for i, sc := range c.SchemaConfig.Configs {
if sc.RowShards > 0 && c.Ingester.IndexShards%int(sc.RowShards) > 0 {
return fmt.Errorf(
"incompatible ingester index shards (%d) and period config row shard factor (%d) for period config at index (%d). The ingester factor must be evenly divisible by all period config factors",
c.Ingester.IndexShards,
sc.RowShards,
i,
)
}
}
return nil
}

Loading…
Cancel
Save