diff --git a/pkg/storage/stores/tsdb/chunkwriter.go b/pkg/storage/stores/tsdb/chunkwriter.go index 5b2da8f0db..6241ec47d5 100644 --- a/pkg/storage/stores/tsdb/chunkwriter.go +++ b/pkg/storage/stores/tsdb/chunkwriter.go @@ -50,7 +50,7 @@ func (w *ChunkWriter) Put(ctx context.Context, chunks []chunk.Chunk) error { } func (w *ChunkWriter) PutOne(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { - log, ctx := spanlogger.New(ctx, "SeriesStore.PutOne") + log, ctx := spanlogger.New(ctx, "TSDBStore.PutOne") defer log.Finish() // with local TSDB indices, we _always_ write the index entry diff --git a/pkg/storage/stores/tsdb/head.go b/pkg/storage/stores/tsdb/head.go index 3e074ac372..dac903aa65 100644 --- a/pkg/storage/stores/tsdb/head.go +++ b/pkg/storage/stores/tsdb/head.go @@ -103,20 +103,15 @@ type Head struct { series *stripeSeries postings *index.MemPostings // Postings lists for terms. - - closedMtx sync.Mutex - closed bool } func NewHead(tenant string, metrics *Metrics, logger log.Logger) *Head { return &Head{ - tenant: tenant, - metrics: metrics, - logger: logger, - series: newStripeSeries(), - postings: index.NewMemPostings(), - closedMtx: sync.Mutex{}, - closed: false, + tenant: tenant, + metrics: metrics, + logger: logger, + series: newStripeSeries(), + postings: index.NewMemPostings(), } } diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 93bfe01508..7ab5003cdb 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -27,10 +27,28 @@ import ( "github.com/grafana/loki/pkg/util/wal" ) +/* +period is a duration which the ingesters use to group index writes into a (WAL,TenantHeads) pair. +After each period elapses, a set of zero or more multitenant TSDB indices are built (one per +index bucket, generally 24h). + +It's important to note that this cycle occurs in real time as opposed to the timestamps of +chunk entries. Index writes during the `period` may span multiple index buckets. Periods +also expose some helper functions to get the remainder-less offset integer for that period, +which we use in file creation/etc. +*/ type period time.Duration const defaultRotationPeriod = period(15 * time.Minute) +func (p period) PeriodFor(t time.Time) int { + return int(t.UnixNano() / int64(p)) +} + +func (p period) TimeForPeriod(n int) time.Time { + return time.Unix(0, int64(p)*int64(n)) +} + // Do not specify without bit shifting. This allows us to // do shard index calcuations via bitwise & rather than modulos. const defaultHeadManagerStripeSize = 1 << 7 @@ -125,10 +143,13 @@ func (m *HeadManager) Stop() error { } func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error { - labelsBuilder := labels.NewBuilder(ls) // TSDB doesnt need the __name__="log" convention the old chunk store index used. - labelsBuilder.Del("__name__") - metric := labelsBuilder.Labels() + for i, l := range ls { + if l.Name == labels.MetricName { + ls = append(ls[:i], ls[i+1:]...) + break + } + } m.mtx.RLock() now := time.Now() @@ -140,18 +161,10 @@ func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMe m.mtx.RLock() } defer m.mtx.RUnlock() - rec := m.activeHeads.Append(userID, metric, chks) + rec := m.activeHeads.Append(userID, ls, chks) return m.active.Log(rec) } -func (p period) PeriodFor(t time.Time) int { - return int(t.UnixNano() / int64(p)) -} - -func (p period) TimeForPeriod(n int) time.Time { - return time.Unix(0, int64(p)*int64(n)) -} - func (m *HeadManager) Start() error { if err := os.RemoveAll(filepath.Join(m.dir, "scratch")); err != nil { return errors.Wrap(err, "removing tsdb scratch dir") @@ -166,17 +179,6 @@ func (m *HeadManager) Start() error { now := time.Now() curPeriod := m.period.PeriodFor(now) - toRemove, err := m.shippedTSDBsBeforePeriod(curPeriod) - if err != nil { - return err - } - - for _, x := range toRemove { - if err := os.RemoveAll(x); err != nil { - return errors.Wrapf(err, "removing tsdb: %s", x) - } - } - walsByPeriod, err := walsByPeriod(m.dir, m.period) if err != nil { return err @@ -185,7 +187,7 @@ func (m *HeadManager) Start() error { m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log) for _, group := range walsByPeriod { - if group.period < (curPeriod) { + if group.period < curPeriod { if err := m.tsdbManager.BuildFromWALs( m.period.TimeForPeriod(group.period), group.wals, @@ -198,7 +200,7 @@ func (m *HeadManager) Start() error { } } - if group.period == curPeriod { + if group.period >= curPeriod { if err := recoverHead(m.dir, m.activeHeads, group.wals); err != nil { return errors.Wrap(err, "recovering tsdb head from wal") } @@ -308,21 +310,6 @@ func (m *HeadManager) Rotate(t time.Time) error { return nil } -func (m *HeadManager) shippedTSDBsBeforePeriod(period int) (res []string, err error) { - files, err := ioutil.ReadDir(managerPerTenantDir(m.dir)) - if err != nil { - return nil, err - } - for _, f := range files { - if id, ok := parseMultitenantTSDBPath(f.Name()); ok { - if found := m.period.PeriodFor(id.ts); found < period { - res = append(res, f.Name()) - } - } - } - return -} - type WalGroup struct { period int wals []WALIdentifier @@ -429,6 +416,7 @@ func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error { return err } + // labels are always written to the WAL before corresponding chunks if len(rec.Series.Labels) > 0 { tenant, ok := seriesMap[rec.UserID] if !ok { @@ -571,12 +559,12 @@ func (t *tenantHeads) Bounds() (model.Time, model.Time) { return model.Time(t.mint.Load()), model.Time(t.maxt.Load()) } -func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx Index, unlock func(), ok bool) { +func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx Index, ok bool) { i := t.shardForTenant(userID) t.locks[i].RLock() + defer t.locks[i].RUnlock() tenant, ok := t.tenants[i][userID] if !ok { - t.locks[i].RUnlock() return } @@ -584,47 +572,43 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx if t.chunkFilter != nil { idx.SetChunkFilterer(t.chunkFilter) } - return idx, t.locks[i].RUnlock, true + return idx, true } func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - idx, unlock, ok := t.tenantIndex(userID, from, through) + idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - defer unlock() return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...) } // Series follows the same semantics regarding the passed slice and shard as GetChunkRefs. func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - idx, unlock, ok := t.tenantIndex(userID, from, through) + idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - defer unlock() return idx.Series(ctx, userID, from, through, nil, shard, matchers...) } func (t *tenantHeads) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { - idx, unlock, ok := t.tenantIndex(userID, from, through) + idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - defer unlock() return idx.LabelNames(ctx, userID, from, through, matchers...) } func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) { - idx, unlock, ok := t.tenantIndex(userID, from, through) + idx, ok := t.tenantIndex(userID, from, through) if !ok { return nil, nil } - defer unlock() return idx.LabelValues(ctx, userID, from, through, name, matchers...) } diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index 0b032e9e4f..a0d41c2a35 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -2,7 +2,9 @@ package tsdb import ( "context" + "fmt" "math" + "sync" "testing" "time" @@ -308,3 +310,57 @@ func Test_HeadManager_Lifecycle(t *testing.T) { require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs) } } + +func BenchmarkTenantHeads(b *testing.B) { + for _, tc := range []struct { + readers, writers int + }{ + { + readers: 10, + }, + { + readers: 100, + }, + { + readers: 1000, + }, + } { + b.Run(fmt.Sprintf("%d", tc.readers), func(b *testing.B) { + heads := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger()) + // 1000 series across 100 tenants + nTenants := 10 + for i := 0; i < 1000; i++ { + tenant := i % nTenants + ls := mustParseLabels(fmt.Sprintf(`{foo="bar", i="%d"}`, i)) + heads.Append(fmt.Sprint(tenant), ls, index.ChunkMetas{ + {}, + }) + } + + for n := 0; n < b.N; n++ { + var wg sync.WaitGroup + for r := 0; r < tc.readers; r++ { + wg.Add(1) + go func(r int) { + defer wg.Done() + var res []ChunkRef + tenant := r % nTenants + + // nolint:ineffassign,staticcheck + res, _ = heads.GetChunkRefs( + context.Background(), + fmt.Sprint(tenant), + 0, math.MaxInt64, + res, + nil, + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + ) + }(r) + } + + wg.Wait() + } + + }) + } +}