Tsdb/head wal feedback (#6107)

* pr feedback, improved mutex unlocking

* tenant heads use sync.Map

* tenant heads bench

* Revert "tenant heads use sync.Map"

This reverts commit 64adebc535.

* removes old code the removed TSDBs outside the shipper + minor fixes
pull/6116/head
Owen Diehl 4 years ago committed by GitHub
parent 8838530976
commit 55ea89f9fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/storage/stores/tsdb/chunkwriter.go
  2. 15
      pkg/storage/stores/tsdb/head.go
  3. 86
      pkg/storage/stores/tsdb/head_manager.go
  4. 56
      pkg/storage/stores/tsdb/head_manager_test.go

@ -50,7 +50,7 @@ func (w *ChunkWriter) Put(ctx context.Context, chunks []chunk.Chunk) error {
}
func (w *ChunkWriter) PutOne(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
log, ctx := spanlogger.New(ctx, "SeriesStore.PutOne")
log, ctx := spanlogger.New(ctx, "TSDBStore.PutOne")
defer log.Finish()
// with local TSDB indices, we _always_ write the index entry

@ -103,20 +103,15 @@ type Head struct {
series *stripeSeries
postings *index.MemPostings // Postings lists for terms.
closedMtx sync.Mutex
closed bool
}
func NewHead(tenant string, metrics *Metrics, logger log.Logger) *Head {
return &Head{
tenant: tenant,
metrics: metrics,
logger: logger,
series: newStripeSeries(),
postings: index.NewMemPostings(),
closedMtx: sync.Mutex{},
closed: false,
tenant: tenant,
metrics: metrics,
logger: logger,
series: newStripeSeries(),
postings: index.NewMemPostings(),
}
}

@ -27,10 +27,28 @@ import (
"github.com/grafana/loki/pkg/util/wal"
)
/*
period is a duration which the ingesters use to group index writes into a (WAL,TenantHeads) pair.
After each period elapses, a set of zero or more multitenant TSDB indices are built (one per
index bucket, generally 24h).
It's important to note that this cycle occurs in real time as opposed to the timestamps of
chunk entries. Index writes during the `period` may span multiple index buckets. Periods
also expose some helper functions to get the remainder-less offset integer for that period,
which we use in file creation/etc.
*/
type period time.Duration
const defaultRotationPeriod = period(15 * time.Minute)
func (p period) PeriodFor(t time.Time) int {
return int(t.UnixNano() / int64(p))
}
func (p period) TimeForPeriod(n int) time.Time {
return time.Unix(0, int64(p)*int64(n))
}
// Do not specify without bit shifting. This allows us to
// do shard index calcuations via bitwise & rather than modulos.
const defaultHeadManagerStripeSize = 1 << 7
@ -125,10 +143,13 @@ func (m *HeadManager) Stop() error {
}
func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error {
labelsBuilder := labels.NewBuilder(ls)
// TSDB doesnt need the __name__="log" convention the old chunk store index used.
labelsBuilder.Del("__name__")
metric := labelsBuilder.Labels()
for i, l := range ls {
if l.Name == labels.MetricName {
ls = append(ls[:i], ls[i+1:]...)
break
}
}
m.mtx.RLock()
now := time.Now()
@ -140,18 +161,10 @@ func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMe
m.mtx.RLock()
}
defer m.mtx.RUnlock()
rec := m.activeHeads.Append(userID, metric, chks)
rec := m.activeHeads.Append(userID, ls, chks)
return m.active.Log(rec)
}
func (p period) PeriodFor(t time.Time) int {
return int(t.UnixNano() / int64(p))
}
func (p period) TimeForPeriod(n int) time.Time {
return time.Unix(0, int64(p)*int64(n))
}
func (m *HeadManager) Start() error {
if err := os.RemoveAll(filepath.Join(m.dir, "scratch")); err != nil {
return errors.Wrap(err, "removing tsdb scratch dir")
@ -166,17 +179,6 @@ func (m *HeadManager) Start() error {
now := time.Now()
curPeriod := m.period.PeriodFor(now)
toRemove, err := m.shippedTSDBsBeforePeriod(curPeriod)
if err != nil {
return err
}
for _, x := range toRemove {
if err := os.RemoveAll(x); err != nil {
return errors.Wrapf(err, "removing tsdb: %s", x)
}
}
walsByPeriod, err := walsByPeriod(m.dir, m.period)
if err != nil {
return err
@ -185,7 +187,7 @@ func (m *HeadManager) Start() error {
m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log)
for _, group := range walsByPeriod {
if group.period < (curPeriod) {
if group.period < curPeriod {
if err := m.tsdbManager.BuildFromWALs(
m.period.TimeForPeriod(group.period),
group.wals,
@ -198,7 +200,7 @@ func (m *HeadManager) Start() error {
}
}
if group.period == curPeriod {
if group.period >= curPeriod {
if err := recoverHead(m.dir, m.activeHeads, group.wals); err != nil {
return errors.Wrap(err, "recovering tsdb head from wal")
}
@ -308,21 +310,6 @@ func (m *HeadManager) Rotate(t time.Time) error {
return nil
}
func (m *HeadManager) shippedTSDBsBeforePeriod(period int) (res []string, err error) {
files, err := ioutil.ReadDir(managerPerTenantDir(m.dir))
if err != nil {
return nil, err
}
for _, f := range files {
if id, ok := parseMultitenantTSDBPath(f.Name()); ok {
if found := m.period.PeriodFor(id.ts); found < period {
res = append(res, f.Name())
}
}
}
return
}
type WalGroup struct {
period int
wals []WALIdentifier
@ -429,6 +416,7 @@ func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error {
return err
}
// labels are always written to the WAL before corresponding chunks
if len(rec.Series.Labels) > 0 {
tenant, ok := seriesMap[rec.UserID]
if !ok {
@ -571,12 +559,12 @@ func (t *tenantHeads) Bounds() (model.Time, model.Time) {
return model.Time(t.mint.Load()), model.Time(t.maxt.Load())
}
func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx Index, unlock func(), ok bool) {
func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx Index, ok bool) {
i := t.shardForTenant(userID)
t.locks[i].RLock()
defer t.locks[i].RUnlock()
tenant, ok := t.tenants[i][userID]
if !ok {
t.locks[i].RUnlock()
return
}
@ -584,47 +572,43 @@ func (t *tenantHeads) tenantIndex(userID string, from, through model.Time) (idx
if t.chunkFilter != nil {
idx.SetChunkFilterer(t.chunkFilter)
}
return idx, t.locks[i].RUnlock, true
return idx, true
}
func (t *tenantHeads) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, unlock, ok := t.tenantIndex(userID, from, through)
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
defer unlock()
return idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
}
// Series follows the same semantics regarding the passed slice and shard as GetChunkRefs.
func (t *tenantHeads) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
idx, unlock, ok := t.tenantIndex(userID, from, through)
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
defer unlock()
return idx.Series(ctx, userID, from, through, nil, shard, matchers...)
}
func (t *tenantHeads) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
idx, unlock, ok := t.tenantIndex(userID, from, through)
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
defer unlock()
return idx.LabelNames(ctx, userID, from, through, matchers...)
}
func (t *tenantHeads) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
idx, unlock, ok := t.tenantIndex(userID, from, through)
idx, ok := t.tenantIndex(userID, from, through)
if !ok {
return nil, nil
}
defer unlock()
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

@ -2,7 +2,9 @@ package tsdb
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
@ -308,3 +310,57 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs)
}
}
func BenchmarkTenantHeads(b *testing.B) {
for _, tc := range []struct {
readers, writers int
}{
{
readers: 10,
},
{
readers: 100,
},
{
readers: 1000,
},
} {
b.Run(fmt.Sprintf("%d", tc.readers), func(b *testing.B) {
heads := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
// 1000 series across 100 tenants
nTenants := 10
for i := 0; i < 1000; i++ {
tenant := i % nTenants
ls := mustParseLabels(fmt.Sprintf(`{foo="bar", i="%d"}`, i))
heads.Append(fmt.Sprint(tenant), ls, index.ChunkMetas{
{},
})
}
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for r := 0; r < tc.readers; r++ {
wg.Add(1)
go func(r int) {
defer wg.Done()
var res []ChunkRef
tenant := r % nTenants
// nolint:ineffassign,staticcheck
res, _ = heads.GetChunkRefs(
context.Background(),
fmt.Sprint(tenant),
0, math.MaxInt64,
res,
nil,
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
)
}(r)
}
wg.Wait()
}
})
}
}

Loading…
Cancel
Save