From 53e01a795a4f8fc293e2dbe2a69bee89f46b81ac Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 6 Sep 2022 16:51:55 +0530 Subject: [PATCH] tsdb: build tsdb from tenantheads (#7036) **What this PR does / why we need it**: refactors tsdb manager code to expose a new method `BuildFromHead` which comes in handy for building tsdb from tenant heads after rotation. Currently we build it by recovering head from WAL even when we have access to the in-mem tenant heads. **Special notes for your reviewer**: using `loki_tsdb_head_rotations_*` metric to track loop failures which could be caused either by rotation failures or if rotation is blocked by prev head failing to build **Checklist** - [ ] Documentation added - [x] Tests updated - [ ] Is this an important fix or new feature? Add an entry in the `CHANGELOG.md`. - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` Signed-off-by: Ashwanth Goli --- pkg/storage/stores/tsdb/head.go | 36 ++-- pkg/storage/stores/tsdb/head_manager.go | 78 +++++--- pkg/storage/stores/tsdb/head_manager_test.go | 4 + pkg/storage/stores/tsdb/manager.go | 184 ++++++++++--------- 4 files changed, 181 insertions(+), 121 deletions(-) diff --git a/pkg/storage/stores/tsdb/head.go b/pkg/storage/stores/tsdb/head.go index 5a92904845..8b8e1609c8 100644 --- a/pkg/storage/stores/tsdb/head.go +++ b/pkg/storage/stores/tsdb/head.go @@ -39,6 +39,8 @@ const ( // Do not specify without bit shifting. This allows us to // do shard index calcuations via bitwise & rather than modulos. defaultStripeSize = 64 + + tsdbBuildSourceLabel = "source" ) /* @@ -57,12 +59,14 @@ guaranteeing we maintain querying consistency for the entire data lifecycle. // TODO(owen-d) type Metrics struct { seriesNotFound prometheus.Counter - tsdbCreationsTotal prometheus.Counter - tsdbCreationFailures prometheus.Counter tsdbManagerUpdatesTotal prometheus.Counter tsdbManagerUpdatesFailedTotal prometheus.Counter tsdbHeadRotationsTotal prometheus.Counter tsdbHeadRotationsFailedTotal prometheus.Counter + tsdbWALTruncationsTotal prometheus.Counter + tsdbWALTruncationsFailedTotal prometheus.Counter + tsdbCreationsTotal *prometheus.CounterVec + tsdbCreationsFailedTotal *prometheus.CounterVec } func NewMetrics(r prometheus.Registerer) *Metrics { @@ -71,14 +75,6 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "loki_tsdb_head_series_not_found_total", Help: "Total number of requests for series that were not found.", }), - tsdbCreationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "loki_tsdb_creations_total", - Help: "Total number of tsdb creations attempted", - }), - tsdbCreationFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "loki_tsdb_creations_failed_total", - Help: "Total number of tsdb creations failed", - }), tsdbManagerUpdatesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_tsdb_manager_updates_total", Help: "Total number of tsdb manager updates (loading/rotating tsdbs in mem)", @@ -89,12 +85,28 @@ func NewMetrics(r prometheus.Registerer) *Metrics { }), tsdbHeadRotationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_tsdb_head_rotations_total", - Help: "Total number of tsdb head rotations", + Help: "Total number of tsdb head rotations attempted", }), tsdbHeadRotationsFailedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "loki_tsdb_head_rotations_failed_total", - Help: "Total number of tsdb head rotations failed", + Help: "Total number of tsdb head rotations that failed", }), + tsdbWALTruncationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_tsdb_wal_truncations_total", + Help: "Total number of WAL truncations attempted", + }), + tsdbWALTruncationsFailedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_tsdb_wal_truncations_failed_total", + Help: "Total number of WAL truncations that failed", + }), + tsdbCreationsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_tsdb_creations_total", + Help: "Total number of tsdb creations attempted", + }, []string{tsdbBuildSourceLabel}), + tsdbCreationsFailedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_tsdb_creations_failed_total", + Help: "Total number of tsdb creations that failed", + }, []string{tsdbBuildSourceLabel}), } } diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 22ab092f2d..a13630a5ef 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -151,8 +151,8 @@ func (m *HeadManager) loop() { return nil } - if err := m.buildTSDBFromWAL(m.prev.initialized); err != nil { - return errors.Wrap(err, "building tsdb head") + if err := m.buildTSDBFromHead(m.prevHeads); err != nil { + return err } // Now that the tsdbManager has the updated TSDBs, we can remove our references @@ -170,8 +170,11 @@ func (m *HeadManager) loop() { for { select { case <-ticker.C: + m.metrics.tsdbHeadRotationsTotal.Inc() + // retry tsdb build failures from previous run if err := buildPrev(); err != nil { + m.metrics.tsdbHeadRotationsFailedTotal.Inc() level.Error(m.log).Log( "msg", "failed building tsdb head", "period", m.period.PeriodFor(m.prev.initialized), @@ -182,11 +185,12 @@ func (m *HeadManager) loop() { } now := time.Now() - if m.period.PeriodFor(now) > m.period.PeriodFor(m.activeHeads.start) { + if curPeriod := m.period.PeriodFor(m.activeHeads.start); m.period.PeriodFor(now) > curPeriod { if err := m.Rotate(now); err != nil { + m.metrics.tsdbHeadRotationsFailedTotal.Inc() level.Error(m.log).Log( "msg", "failed rotating tsdb head", - "period", m.period.PeriodFor(m.prev.initialized), + "period", curPeriod, "err", err, ) continue @@ -197,6 +201,7 @@ func (m *HeadManager) loop() { if err := buildPrev(); err != nil { level.Error(m.log).Log( "msg", "failed building tsdb head", + "period", m.period.PeriodFor(m.prev.initialized), "err", err, ) } @@ -216,7 +221,18 @@ func (m *HeadManager) Stop() error { return err } - return m.buildTSDBFromWAL(m.active.initialized) + if m.prev != nil { + if err := m.buildTSDBFromHead(m.prevHeads); err != nil { + // log the error and start building active head + level.Error(m.log).Log( + "msg", "failed building tsdb from prev head", + "period", m.period.PeriodFor(m.prev.initialized), + "err", err, + ) + } + } + + return m.buildTSDBFromHead(m.activeHeads) } func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error { @@ -270,7 +286,9 @@ func (m *HeadManager) Start() error { return errors.Wrap(err, "building tsdb") } + m.metrics.tsdbWALTruncationsTotal.Inc() if err := os.RemoveAll(managerWalDir(m.dir)); err != nil { + m.metrics.tsdbWALTruncationsFailedTotal.Inc() return errors.New("cleaning (removing) wal dir") } @@ -310,13 +328,6 @@ func managerPerTenantDir(parent string) string { } func (m *HeadManager) Rotate(t time.Time) (err error) { - defer func() { - m.metrics.tsdbHeadRotationsTotal.Inc() - if err != nil { - m.metrics.tsdbHeadRotationsFailedTotal.Inc() - } - }() - // create new wal nextWALPath := walPath(m.dir, t) nextWAL, err := newHeadWAL(m.log, nextWALPath, t) @@ -349,27 +360,40 @@ func (m *HeadManager) Rotate(t time.Time) (err error) { return nil } -func (m *HeadManager) buildTSDBFromWAL(t time.Time) error { - level.Debug(m.log).Log("msg", "combining tsdb WALs") - grp, _, err := walsForPeriod(m.dir, m.period, m.period.PeriodFor(t)) +func (m *HeadManager) buildTSDBFromHead(head *tenantHeads) error { + period := m.period.PeriodFor(head.start) + if err := m.tsdbManager.BuildFromHead(head); err != nil { + return errors.Wrap(err, "building tsdb from head") + } + + // Now that a TSDB has been created from this group, it's safe to remove them + if err := m.truncateWALForPeriod(period); err != nil { + level.Error(m.log).Log( + "msg", "failed truncating wal files", + "period", period, + "err", err, + ) + } + + return nil +} + +func (m *HeadManager) truncateWALForPeriod(period int) (err error) { + m.metrics.tsdbWALTruncationsTotal.Inc() + defer func() { + if err != nil { + m.metrics.tsdbWALTruncationsFailedTotal.Inc() + } + }() + + grp, _, err := walsForPeriod(m.dir, m.period, period) if err != nil { return errors.Wrap(err, "listing wals") } level.Debug(m.log).Log("msg", "listed WALs", "pd", grp.period, "n", len(grp.wals)) - // TODO(owen-d): It's probably faster to build this from the *tenantHeads instead, - // but we already need to impl BuildFromWALs to ensure we can correctly build/ship - // TSDBs from orphaned WALs of previous periods during startup. - // we use the same timestamp as wal here for the filename to ensure it can't clobber - // an existing file from a previous cycle. I don't think this is possible, but - // perhaps in some unusual crashlooping it could be, so let's be safe and protect ourselves. - if err := m.tsdbManager.BuildFromWALs(t, grp.wals); err != nil { - return errors.Wrapf(err, "building TSDB from prevHeads WALs for period %d", grp.period) - } - - // Now that a TSDB has been created from this group, it's safe to remove them if err := m.removeWALGroup(grp); err != nil { - return errors.Wrapf(err, "removing prev TSDB WALs for period %d", grp.period) + return errors.Wrapf(err, "removing TSDB WALs for period %d", grp.period) } level.Debug(m.log).Log("msg", "removing wals", "pd", grp.period, "n", len(grp.wals)) diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index 4739abd46c..ae9dd71754 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -31,6 +31,10 @@ func newNoopTSDBManager(dir string) noopTSDBManager { } } +func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error { + panic("BuildFromHead not implemented") +} + func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier) error { return recoverHead(m.dir, m.tenantHeads, wals) } diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index a600b840ec..bf26f67392 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -28,6 +28,8 @@ type TSDBManager interface { Start() error // Builds a new TSDB file from a set of WALs BuildFromWALs(time.Time, []WALIdentifier) error + // Builds a new TSDB file from tenantHeads + BuildFromHead(*tenantHeads) error } /* @@ -156,105 +158,123 @@ func (m *tsdbManager) Start() (err error) { return nil } -func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) { - level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t) - // get relevant wals - // iterate them, build tsdb in scratch dir - defer func() { - m.metrics.tsdbCreationsTotal.Inc() - if err != nil { - m.metrics.tsdbCreationFailures.Inc() +func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) { + periods := make(map[string]*Builder) + + if err := heads.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error { + + // chunks may overlap index period bounds, in which case they're written to multiple + pds := make(map[string]index.ChunkMetas) + for _, chk := range chks { + idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges) + if err != nil { + return err + } + + for _, bucket := range idxBuckets { + pds[bucket] = append(pds[bucket], chk) + } } - }() - level.Debug(m.log).Log("msg", "recovering tenant heads") - for _, id := range ids { - tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log) - if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil { - return errors.Wrap(err, "building TSDB from WALs") + // Embed the tenant label into TSDB + lb := labels.NewBuilder(ls) + lb.Set(TenantLabel, user) + withTenant := lb.Labels() + + // Add the chunks to all relevant builders + for pd, matchingChks := range pds { + b, ok := periods[pd] + if !ok { + b = NewBuilder() + periods[pd] = b + } + + b.AddSeries( + withTenant, + // use the fingerprint without the added tenant label + // so queries route to the chunks which actually exist. + model.Fingerprint(ls.Hash()), + matchingChks, + ) } - periods := make(map[string]*Builder) + return nil + }); err != nil { + level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB") + return err + } - if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error { + for p, b := range periods { + dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) + dst := newPrefixedIdentifier( + MultitenantTSDBIdentifier{ + nodeName: m.nodeName, + ts: heads.start, + }, + dstDir, + "", + ) - // chunks may overlap index period bounds, in which case they're written to multiple - pds := make(map[string]index.ChunkMetas) - for _, chk := range chks { - idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges) - if err != nil { - return err - } + level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path()) + // build+move tsdb to multitenant dir + start := time.Now() + _, err = b.Build( + context.Background(), + managerScratchDir(m.dir), + func(from, through model.Time, checksum uint32) Identifier { + return dst + }, + ) + if err != nil { + return err + } - for _, bucket := range idxBuckets { - pds[bucket] = append(pds[bucket], chk) - } - } + level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) - // Embed the tenant label into TSDB - lb := labels.NewBuilder(ls) - lb.Set(TenantLabel, user) - withTenant := lb.Labels() - - // Add the chunks to all relevant builders - for pd, matchingChks := range pds { - b, ok := periods[pd] - if !ok { - b = NewBuilder() - periods[pd] = b - } - - b.AddSeries( - withTenant, - // use the fingerprint without the added tenant label - // so queries route to the chunks which actually exist. - model.Fingerprint(ls.Hash()), - matchingChks, - ) - } + loaded, err := NewShippableTSDBFile(dst, false) + if err != nil { + return err + } - return nil - }); err != nil { - level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs") + if err := m.shipper.AddIndex(p, "", loaded); err != nil { return err } + } - for p, b := range periods { + return nil +} - dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) - dst := newPrefixedIdentifier( - MultitenantTSDBIdentifier{ - nodeName: m.nodeName, - ts: id.ts, - }, - dstDir, - "", - ) +func (m *tsdbManager) BuildFromHead(heads *tenantHeads) (err error) { + level.Debug(m.log).Log("msg", "building heads") + defer func() { + m.metrics.tsdbCreationsTotal.WithLabelValues("head").Inc() + if err != nil { + m.metrics.tsdbCreationsFailedTotal.WithLabelValues("head").Inc() + } + }() - level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path()) - // build+move tsdb to multitenant dir - start := time.Now() - _, err = b.Build( - context.Background(), - managerScratchDir(m.dir), - func(from, through model.Time, checksum uint32) Identifier { - return dst - }, - ) - if err != nil { - return err - } + return m.buildFromHead(heads) +} - level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) +func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) { + level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t) + defer func() { + m.metrics.tsdbCreationsTotal.WithLabelValues("wal").Inc() + if err != nil { + m.metrics.tsdbCreationsFailedTotal.WithLabelValues("wal").Inc() + } + }() - loaded, err := NewShippableTSDBFile(dst, false) - if err != nil { - return err - } + level.Debug(m.log).Log("msg", "recovering tenant heads") + for _, id := range ids { + tmp := newTenantHeads(id.ts, defaultHeadManagerStripeSize, m.metrics, m.log) + if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil { + return errors.Wrap(err, "building TSDB from WALs") + } - if err := m.shipper.AddIndex(p, "", loaded); err != nil { - return err - } + err := m.buildFromHead(tmp) + if err != nil { + return err } }