Tsdb/inverted index wiring (#6252)

* multi inverted index

* delete in inverted index signature

* multi inverted index

* inverted index testing + skip nil indices

* wires up period config aware multi-inverted-idx through ingesters

* better inverted index validation

* apply shipper defaults to last relevant period config, not current one.

* more verbose error
pull/6285/merge
Owen Diehl 4 years ago committed by GitHub
parent ea0a5246cc
commit 4903df6ae5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/ingester/checkpoint_test.go
  2. 2
      pkg/ingester/flush_test.go
  3. 11
      pkg/ingester/index/bitprefix.go
  4. 1
      pkg/ingester/index/index.go
  5. 124
      pkg/ingester/index/multi.go
  6. 179
      pkg/ingester/index/multi_test.go
  7. 44
      pkg/ingester/ingester.go
  8. 6
      pkg/ingester/ingester_test.go
  9. 43
      pkg/ingester/instance.go
  10. 38
      pkg/ingester/instance_test.go
  11. 10
      pkg/ingester/recovery.go
  12. 5
      pkg/ingester/transfer.go
  13. 42
      pkg/loki/config_compat.go
  14. 45
      pkg/loki/config_wrapper.go
  15. 15
      pkg/loki/loki.go

@ -449,7 +449,8 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
@ -495,7 +496,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{

@ -338,7 +338,7 @@ func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, throu
}
func (s *testStore) GetSchemaConfigs() []config.PeriodConfig {
return nil
return defaultPeriodConfigs
}
func (s *testStore) Stop() {}

@ -23,9 +23,16 @@ type BitPrefixInvertedIndex struct {
shards []*indexShard
}
func ValidateBitPrefixShardFactor(factor uint32) error {
if requiredBits := index.NewShard(0, factor).RequiredBits(); 1<<requiredBits != factor {
return fmt.Errorf("Incompatible inverted index shard factor on ingester: It must be a power of two, got %d", factor)
}
return nil
}
func NewBitPrefixWithShards(totalShards uint32) (*BitPrefixInvertedIndex, error) {
if requiredBits := index.NewShard(0, totalShards).RequiredBits(); 1<<requiredBits != totalShards {
return nil, fmt.Errorf("Shard factor must be a power of two, got %d", totalShards)
if err := ValidateBitPrefixShardFactor(totalShards); err != nil {
return nil, err
}
shards := make([]*indexShard, totalShards)

@ -31,6 +31,7 @@ type Interface interface {
Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error)
LabelNames(shard *astmapper.ShardAnnotation) ([]string, error)
LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error)
Delete(labels labels.Labels, fp model.Fingerprint)
}
// InvertedIndex implements a in-memory inverted index from label pairs to fingerprints.

@ -0,0 +1,124 @@
package index
import (
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
)
type periodIndex struct {
time.Time
idx int // address of the index to use
}
type Multi struct {
periods []periodIndex
indices []Interface
}
func NewMultiInvertedIndex(periods []config.PeriodConfig, indexShards uint32) (*Multi, error) {
var (
err error
ii Interface // always stored in 0th index
bitPrefixed Interface // always stored in 1st index
periodIndices []periodIndex
)
for _, pd := range periods {
switch pd.IndexType {
case config.TSDBType:
if bitPrefixed == nil {
bitPrefixed, err = NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, errors.Wrapf(err, "creating tsdb inverted index for period starting %v", pd.From)
}
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 1, // tsdb inverted index is always stored in position one
})
default:
if ii == nil {
ii = NewWithShards(indexShards)
}
periodIndices = append(periodIndices, periodIndex{
Time: pd.From.Time.Time(),
idx: 0, // regular inverted index is always stored in position zero
})
}
}
return &Multi{
periods: periodIndices,
indices: []Interface{ii, bitPrefixed},
}, nil
}
func (m *Multi) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) (result labels.Labels) {
for _, i := range m.indices {
if i != nil {
result = i.Add(labels, fp)
}
}
return
}
func (m *Multi) Delete(labels labels.Labels, fp model.Fingerprint) {
for _, i := range m.indices {
if i != nil {
i.Delete(labels, fp)
}
}
}
func (m *Multi) Lookup(t time.Time, matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return m.indexFor(t).Lookup(matchers, shard)
}
func (m *Multi) LabelNames(t time.Time, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelNames(shard)
}
func (m *Multi) LabelValues(t time.Time, name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return m.indexFor(t).LabelValues(name, shard)
}
// Query planning is responsible for ensuring no query spans more than one inverted index.
// Therefore we don't need to account for both `from` and `through`.
func (m *Multi) indexFor(t time.Time) Interface {
for i := range m.periods {
if !m.periods[i].Time.After(t) && (i+1 == len(m.periods) || t.Before(m.periods[i+1].Time)) {
return m.indices[m.periods[i].idx]
}
}
return noopInvertedIndex{}
}
type noopInvertedIndex struct{}
func (noopInvertedIndex) Add(labels []logproto.LabelAdapter, fp model.Fingerprint) labels.Labels {
return nil
}
func (noopInvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) {}
func (noopInvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
return nil, nil
}
func (noopInvertedIndex) LabelNames(shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}
func (noopInvertedIndex) LabelValues(name string, shard *astmapper.ShardAnnotation) ([]string, error) {
return nil, nil
}

@ -0,0 +1,179 @@
package index
import (
"sort"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
func MustParseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{Time: model.TimeFromUnix(t.Unix())}
}
var testPeriodConfigs = []config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
{
From: MustParseDayTime("2022-01-01"),
IndexType: config.BoltDBShipperType,
},
{
From: MustParseDayTime("2023-01-01"),
IndexType: config.TSDBType,
},
}
// Only run the specific shard factor validation logic if a period config using
// tsdb exists
func TestIgnoresInvalidShardFactorWhenTSDBNotPresent(t *testing.T) {
factor := uint32(6)
_, err := NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
},
factor,
)
require.Nil(t, err)
_, err = NewMultiInvertedIndex(
[]config.PeriodConfig{
{
From: MustParseDayTime("2020-01-01"),
IndexType: config.StorageTypeBigTable,
},
{
From: MustParseDayTime("2021-01-01"),
IndexType: config.TSDBType,
},
},
factor,
)
require.Error(t, err)
}
func TestMultiIndexCreation(t *testing.T) {
multi, err := NewMultiInvertedIndex(testPeriodConfigs, uint32(2))
require.Nil(t, err)
x, _ := NewBitPrefixWithShards(2)
expected := &Multi{
periods: []periodIndex{
{
Time: testPeriodConfigs[0].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[1].From.Time.Time(),
idx: 1,
},
{
Time: testPeriodConfigs[2].From.Time.Time(),
idx: 0,
},
{
Time: testPeriodConfigs[3].From.Time.Time(),
idx: 1,
},
},
indices: []Interface{
NewWithShards(2),
x,
},
}
require.Equal(t, expected, multi)
}
func TestMultiIndex(t *testing.T) {
factor := uint32(32)
multi, err := NewMultiInvertedIndex(testPeriodConfigs, factor)
require.Nil(t, err)
lbs := []logproto.LabelAdapter{
{Name: "foo", Value: "foo"},
{Name: "bar", Value: "bar"},
{Name: "buzz", Value: "buzz"},
}
sort.Sort(logproto.FromLabelAdaptersToLabels(lbs))
fp := model.Fingerprint((logproto.FromLabelAdaptersToLabels(lbs).Hash()))
ls := multi.Add(lbs, fp)
// Lookup at a time corresponding to a non-tsdb periodconfig
// and ensure we use modulo hashing
expShard := labelsSeriesIDHash(logproto.FromLabelAdaptersToLabels(lbs)) % factor
ids, err := multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)
require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)
// Lookup at a time corresponding to a tsdb periodconfig
// and ensure we use bit prefix hashing
requiredBits := index.NewShard(0, factor).RequiredBits()
expShard = uint32(fp >> (64 - requiredBits))
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
&astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
)
require.Nil(t, err)
require.Equal(t, []model.Fingerprint{fp}, ids)
// Delete the entry
multi.Delete(ls, fp)
// Ensure deleted entry is not in modulo variant
ids, err = multi.Lookup(
testPeriodConfigs[0].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)
require.Nil(t, err)
require.Equal(t, 0, len(ids))
// Ensure deleted entry is not in bit prefix variant
ids, err = multi.Lookup(
testPeriodConfigs[1].From.Time.Time(),
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
nil,
)
require.Nil(t, err)
require.Equal(t, 0, len(ids))
}

@ -173,7 +173,7 @@ type Interface interface {
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
GetOrCreateInstance(instanceID string) *instance
GetOrCreateInstance(instanceID string) (*instance, error)
}
// Ingester builds chunks for incoming log streams.
@ -547,26 +547,33 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return nil, ErrReadOnly
}
instance := i.GetOrCreateInstance(instanceID)
instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return &logproto.PushResponse{}, err
}
err = instance.Push(ctx, req)
return &logproto.PushResponse{}, err
}
func (i *Ingester) GetOrCreateInstance(instanceID string) *instance { //nolint:revive
func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { //nolint:revive
inst, ok := i.getInstanceByID(instanceID)
if ok {
return inst
return inst, nil
}
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
if err != nil {
return nil, err
}
i.instances[instanceID] = inst
activeTenantsStats.Set(int64(len(i.instances)))
}
return inst
return inst, nil
}
// Query the ingests for log streams matching a set of matchers.
@ -579,7 +586,10 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}
instance := i.GetOrCreateInstance(instanceID)
instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return err
}
it, err := instance.Query(ctx, logql.SelectLogParams{QueryRequest: req})
if err != nil {
return err
@ -618,7 +628,10 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
return err
}
instance := i.GetOrCreateInstance(instanceID)
instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return err
}
it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req})
if err != nil {
return err
@ -718,7 +731,10 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}
instance := i.GetOrCreateInstance(userID)
instance, err := i.GetOrCreateInstance(userID)
if err != nil {
return nil, err
}
resp, err := instance.Label(ctx, req)
if err != nil {
return nil, err
@ -776,7 +792,10 @@ func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
return nil, err
}
instance := i.GetOrCreateInstance(instanceID)
instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return nil, err
}
return instance.Series(ctx, req)
}
@ -827,7 +846,10 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}
instance := i.GetOrCreateInstance(instanceID)
instance, err := i.GetOrCreateInstance(instanceID)
if err != nil {
return err
}
tailer, err := newTailer(instanceID, req.Query, queryServer, i.cfg.MaxDroppedStreams)
if err != nil {
return err

@ -291,7 +291,7 @@ func (s *mockStore) GetSeries(ctx context.Context, req logql.SelectLogParams) ([
}
func (s *mockStore) GetSchemaConfigs() []config.PeriodConfig {
return nil
return defaultPeriodConfigs
}
func (s *mockStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {
@ -588,7 +588,9 @@ func Test_InMemoryLabels(t *testing.T) {
_, err = i.Push(ctx, &req)
require.NoError(t, err)
start := time.Unix(0, 0)
res, err := i.Label(ctx, &logproto.LabelRequest{
Start: &start,
Name: "bar",
Values: true,
})
@ -596,7 +598,7 @@ func Test_InMemoryLabels(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"baz1", "baz2"}, res.Values)
res, err = i.Label(ctx, &logproto.LabelRequest{})
res, err = i.Label(ctx, &logproto.LabelRequest{Start: &start})
require.NoError(t, err)
require.Equal(t, []string{"bar", "foo"}, res.Values)
}

@ -6,6 +6,7 @@ import (
"os"
"sync"
"syscall"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -27,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
@ -66,7 +68,7 @@ type instance struct {
buf []byte // buffer used to compute fps.
streams *streamsMap
index *index.InvertedIndex
index *index.Multi
mapper *fpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
instanceID string
@ -91,12 +93,26 @@ type instance struct {
chunkFilter chunk.RequestChunkFilterer
}
func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch, chunkFilter chunk.RequestChunkFilterer) *instance {
func newInstance(
cfg *Config,
periodConfigs []config.PeriodConfig,
instanceID string,
limiter *Limiter,
configs *runtime.TenantConfigs,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
chunkFilter chunk.RequestChunkFilterer,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
return nil, err
}
i := &instance{
cfg: cfg,
streams: newStreamsMap(),
buf: make([]byte, 0, 1024),
index: index.NewWithShards(uint32(cfg.IndexShards)),
index: invertedIndex,
instanceID: instanceID,
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
@ -113,7 +129,7 @@ func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runt
chunkFilter: chunkFilter,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
return i, err
}
// consumeChunk manually adds a chunk that was received during ingester chunk
@ -339,6 +355,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
err = i.forMatchingStreams(
ctx,
req.Start,
expr.Matchers(),
shard,
func(stream *stream) error {
@ -390,6 +407,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
err = i.forMatchingStreams(
ctx,
req.Start,
expr.Selector().Matchers(),
shard,
func(stream *stream) error {
@ -416,7 +434,7 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matche
if len(matchers) == 0 {
var labels []string
if req.Values {
values, err := i.index.LabelValues(req.Name, nil)
values, err := i.index.LabelValues(*req.Start, req.Name, nil)
if err != nil {
return nil, err
}
@ -428,7 +446,7 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matche
Values: labels,
}, nil
}
names, err := i.index.LabelNames(nil)
names, err := i.index.LabelNames(*req.Start, nil)
if err != nil {
return nil, err
}
@ -442,7 +460,7 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matche
}
labels := make([]string, 0)
err := i.forMatchingStreams(ctx, matchers, nil, func(s *stream) error {
err := i.forMatchingStreams(ctx, *req.Start, matchers, nil, func(s *stream) error {
for _, label := range s.labels {
if req.Values && label.Name == req.Name {
labels = append(labels, label.Value)
@ -478,7 +496,7 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, i.streams.Len())
err = i.forMatchingStreams(ctx, nil, shard, func(stream *stream) error {
err = i.forMatchingStreams(ctx, req.Start, nil, shard, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
series = append(series, logproto.SeriesIdentifier{
@ -493,7 +511,7 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(ctx, matchers, shard, func(stream *stream) error {
err = i.forMatchingStreams(ctx, req.Start, matchers, shard, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
// exit early when this stream was added by an earlier group
@ -553,12 +571,15 @@ func (i *instance) forAllStreams(ctx context.Context, fn func(*stream) error) er
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
ctx context.Context,
// ts denotes the beginning of the request
// and is used to select the correct inverted index
ts time.Time,
matchers []*labels.Matcher,
shards *astmapper.ShardAnnotation,
fn func(*stream) error,
) error {
filters, matchers := util.SplitFiltersAndMatchers(matchers)
ids, err := i.index.Lookup(matchers, shards)
ids, err := i.index.Lookup(ts, matchers, shards)
if err != nil {
return err
}
@ -591,7 +612,7 @@ outer:
}
func (i *instance) addNewTailer(ctx context.Context, t *tailer) error {
if err := i.forMatchingStreams(ctx, t.matchers, nil, func(s *stream) error {
if err := i.forMatchingStreams(ctx, time.Now(), t.matchers, nil, func(s *stream) error {
s.addTailer(t)
return nil
}); err != nil {

@ -13,8 +13,10 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@ -36,6 +38,21 @@ func defaultConfig() *Config {
return &cfg
}
func MustParseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{Time: model.TimeFromUnix(t.Unix())}
}
var defaultPeriodConfigs = []config.PeriodConfig{
{
From: MustParseDayTime("1900-01-01"),
IndexType: config.StorageTypeBigTable,
},
}
var NilMetrics = newIngesterMetrics(nil)
func TestLabelsCollisions(t *testing.T) {
@ -43,7 +60,8 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
require.Nil(t, err)
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
@ -70,7 +88,8 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
require.Nil(t, err)
const (
concurrent = 10
@ -128,7 +147,9 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
require.Nil(t, err)
lbls := makeRandomLabels()
tt := time.Now()
@ -171,7 +192,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards
instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
require.Nil(t, err)
currentTime := time.Now()
@ -376,7 +398,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{IndexShards: 1}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -420,7 +442,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
@ -629,8 +651,9 @@ func defaultInstance(t *testing.T) *instance {
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
instance, err := newInstance(
&ingesterConfig,
defaultPeriodConfigs,
"fake",
NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1),
loki_runtime.DefaultTenantConfigs(),
@ -639,6 +662,7 @@ func defaultInstance(t *testing.T) *instance {
nil,
nil,
)
require.Nil(t, err)
insertData(t, instance)
return instance

@ -78,7 +78,10 @@ func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) }
func (r *ingesterRecoverer) Series(series *Series) error {
return r.ing.replayController.WithBackPressure(func() error {
inst := r.ing.GetOrCreateInstance(series.UserID)
inst, err := r.ing.GetOrCreateInstance(series.UserID)
if err != nil {
return err
}
// TODO(owen-d): create another fn to avoid unnecessary label type conversions.
stream, err := inst.getOrCreateStream(logproto.Stream{
@ -126,7 +129,10 @@ func (r *ingesterRecoverer) Series(series *Series) error {
// the fingerprint reported in the WAL record, not the potentially differing one assigned during
// stream creation.
func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) error {
inst := r.ing.GetOrCreateInstance(userID)
inst, err := r.ing.GetOrCreateInstance(userID)
if err != nil {
return err
}
stream, err := inst.getOrCreateStream(
logproto.Stream{

@ -107,7 +107,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
lbls = append(lbls, labels.Label{Name: lbl.Name, Value: lbl.Value})
}
instance := i.GetOrCreateInstance(chunkSet.UserId)
instance, err := i.GetOrCreateInstance(chunkSet.UserId)
if err != nil {
return err
}
for _, chunk := range chunkSet.Chunks {
if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil {
return err

@ -0,0 +1,42 @@
package loki
import (
"fmt"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/storage/config"
)
func ValidateConfigCompatibility(c Config) error {
for _, fn := range []func(Config) error{
ensureInvertedIndexShardingCompatibility,
} {
if err := fn(c); err != nil {
return err
}
}
return nil
}
func ensureInvertedIndexShardingCompatibility(c Config) error {
for i, sc := range c.SchemaConfig.Configs {
switch sc.IndexType {
case config.TSDBType:
if err := index.ValidateBitPrefixShardFactor(uint32(c.Ingester.IndexShards)); err != nil {
return err
}
default:
if sc.RowShards > 0 && c.Ingester.IndexShards%int(sc.RowShards) > 0 {
return fmt.Errorf(
"incompatible ingester index shards (%d) and period config row shard factor (%d) for period config at index (%d). The ingester factor must be evenly divisible by all period config factors",
c.Ingester.IndexShards,
sc.RowShards,
i,
)
}
}
}
return nil
}

@ -99,12 +99,12 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
return err
}
if len(r.SchemaConfig.Configs) > 0 && config.UsingBoltdbShipper(r.SchemaConfig.Configs) {
betterBoltdbShipperDefaults(r, &defaults)
if i := lastBoltdbShipperConfig(r.SchemaConfig.Configs); i != len(r.SchemaConfig.Configs) {
betterBoltdbShipperDefaults(r, &defaults, r.SchemaConfig.Configs[i])
}
if len(r.SchemaConfig.Configs) > 0 && config.UsingTSDB(r.SchemaConfig.Configs) {
betterTSDBShipperDefaults(r, &defaults)
if i := lastTSDBConfig(r.SchemaConfig.Configs); i != len(r.SchemaConfig.Configs) {
betterTSDBShipperDefaults(r, &defaults, r.SchemaConfig.Configs[i])
}
applyFIFOCacheConfig(r)
@ -116,6 +116,27 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}
func lastConfigFor(configs []config.PeriodConfig, predicate func(config.PeriodConfig) bool) int {
for i := len(configs) - 1; i >= 0; i-- {
if predicate(configs[i]) {
return i
}
}
return len(configs)
}
func lastBoltdbShipperConfig(configs []config.PeriodConfig) int {
return lastConfigFor(configs, func(p config.PeriodConfig) bool {
return p.IndexType == config.BoltDBShipperType
})
}
func lastTSDBConfig(configs []config.PeriodConfig) int {
return lastConfigFor(configs, func(p config.PeriodConfig) bool {
return p.IndexType == config.TSDBType
})
}
// applyInstanceConfigs apply to Loki components instance-related configurations under the common
// config section.
//
@ -486,16 +507,14 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
return nil
}
func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]
func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType == defaults.StorageConfig.BoltDBShipperConfig.SharedStoreType {
cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType = currentSchema.ObjectType
cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType = period.ObjectType
}
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
cfg.CompactorConfig.SharedStoreType = period.ObjectType
}
if cfg.Common.PathPrefix != "" {
@ -511,16 +530,14 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
}
}
func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]
func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.StorageConfig.TSDBShipperConfig.SharedStoreType == defaults.StorageConfig.TSDBShipperConfig.SharedStoreType {
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = currentSchema.ObjectType
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = period.ObjectType
}
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
cfg.CompactorConfig.SharedStoreType = period.ObjectType
}
if cfg.Common.PathPrefix != "" {

@ -195,19 +195,14 @@ func (c *Config) Validate() error {
c.LimitsConfig.MaxQueryLookback = c.ChunkStoreConfig.MaxLookBackPeriod
}
for i, sc := range c.SchemaConfig.Configs {
if sc.RowShards > 0 && c.Ingester.IndexShards%int(sc.RowShards) > 0 {
return fmt.Errorf(
"incompatible ingester index shards (%d) and period config row shard factor (%d) for period config at index (%d). The ingester factor must be evenly divisible by all period config factors",
c.Ingester.IndexShards,
sc.RowShards,
i,
)
}
}
if err := c.QueryRange.Validate(); err != nil {
return errors.Wrap(err, "invalid query_range config")
}
if err := ValidateConfigCompatibility(*c); err != nil {
return err
}
return nil
}

Loading…
Cancel
Save