tsdb: build tsdb from tenantheads (#7036)

**What this PR does / why we need it**:
refactors tsdb manager code to expose a new method `BuildFromHead` which comes in handy for building tsdb from tenant heads after rotation.

Currently we build it by recovering head from WAL even when we have access to the in-mem tenant heads.

**Special notes for your reviewer**:
using `loki_tsdb_head_rotations_*` metric to track loop failures which could be caused either by rotation failures or if rotation is blocked by prev head failing to build

**Checklist**
- [ ] Documentation added
- [x] Tests updated
- [ ] Is this an important fix or new feature? Add an entry in the `CHANGELOG.md`.
- [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md`

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/7071/head
Ashwanth 3 years ago committed by GitHub
parent 93a5a71e62
commit 53e01a795a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      pkg/storage/stores/tsdb/head.go
  2. 78
      pkg/storage/stores/tsdb/head_manager.go
  3. 4
      pkg/storage/stores/tsdb/head_manager_test.go
  4. 184
      pkg/storage/stores/tsdb/manager.go

@ -39,6 +39,8 @@ const (
// Do not specify without bit shifting. This allows us to
// do shard index calcuations via bitwise & rather than modulos.
defaultStripeSize = 64
tsdbBuildSourceLabel = "source"
)
/*
@ -57,12 +59,14 @@ guaranteeing we maintain querying consistency for the entire data lifecycle.
// TODO(owen-d)
type Metrics struct {
seriesNotFound prometheus.Counter
tsdbCreationsTotal prometheus.Counter
tsdbCreationFailures prometheus.Counter
tsdbManagerUpdatesTotal prometheus.Counter
tsdbManagerUpdatesFailedTotal prometheus.Counter
tsdbHeadRotationsTotal prometheus.Counter
tsdbHeadRotationsFailedTotal prometheus.Counter
tsdbWALTruncationsTotal prometheus.Counter
tsdbWALTruncationsFailedTotal prometheus.Counter
tsdbCreationsTotal *prometheus.CounterVec
tsdbCreationsFailedTotal *prometheus.CounterVec
}
func NewMetrics(r prometheus.Registerer) *Metrics {
@ -71,14 +75,6 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Name: "loki_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.",
}),
tsdbCreationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_creations_total",
Help: "Total number of tsdb creations attempted",
}),
tsdbCreationFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_creations_failed_total",
Help: "Total number of tsdb creations failed",
}),
tsdbManagerUpdatesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_manager_updates_total",
Help: "Total number of tsdb manager updates (loading/rotating tsdbs in mem)",
@ -89,12 +85,28 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
}),
tsdbHeadRotationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_head_rotations_total",
Help: "Total number of tsdb head rotations",
Help: "Total number of tsdb head rotations attempted",
}),
tsdbHeadRotationsFailedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_head_rotations_failed_total",
Help: "Total number of tsdb head rotations failed",
Help: "Total number of tsdb head rotations that failed",
}),
tsdbWALTruncationsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_wal_truncations_total",
Help: "Total number of WAL truncations attempted",
}),
tsdbWALTruncationsFailedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_tsdb_wal_truncations_failed_total",
Help: "Total number of WAL truncations that failed",
}),
tsdbCreationsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_tsdb_creations_total",
Help: "Total number of tsdb creations attempted",
}, []string{tsdbBuildSourceLabel}),
tsdbCreationsFailedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_tsdb_creations_failed_total",
Help: "Total number of tsdb creations that failed",
}, []string{tsdbBuildSourceLabel}),
}
}

@ -151,8 +151,8 @@ func (m *HeadManager) loop() {
return nil
}
if err := m.buildTSDBFromWAL(m.prev.initialized); err != nil {
return errors.Wrap(err, "building tsdb head")
if err := m.buildTSDBFromHead(m.prevHeads); err != nil {
return err
}
// Now that the tsdbManager has the updated TSDBs, we can remove our references
@ -170,8 +170,11 @@ func (m *HeadManager) loop() {
for {
select {
case <-ticker.C:
m.metrics.tsdbHeadRotationsTotal.Inc()
// retry tsdb build failures from previous run
if err := buildPrev(); err != nil {
m.metrics.tsdbHeadRotationsFailedTotal.Inc()
level.Error(m.log).Log(
"msg", "failed building tsdb head",
"period", m.period.PeriodFor(m.prev.initialized),
@ -182,11 +185,12 @@ func (m *HeadManager) loop() {
}
now := time.Now()
if m.period.PeriodFor(now) > m.period.PeriodFor(m.activeHeads.start) {
if curPeriod := m.period.PeriodFor(m.activeHeads.start); m.period.PeriodFor(now) > curPeriod {
if err := m.Rotate(now); err != nil {
m.metrics.tsdbHeadRotationsFailedTotal.Inc()
level.Error(m.log).Log(
"msg", "failed rotating tsdb head",
"period", m.period.PeriodFor(m.prev.initialized),
"period", curPeriod,
"err", err,
)
continue
@ -197,6 +201,7 @@ func (m *HeadManager) loop() {
if err := buildPrev(); err != nil {
level.Error(m.log).Log(
"msg", "failed building tsdb head",
"period", m.period.PeriodFor(m.prev.initialized),
"err", err,
)
}
@ -216,7 +221,18 @@ func (m *HeadManager) Stop() error {
return err
}
return m.buildTSDBFromWAL(m.active.initialized)
if m.prev != nil {
if err := m.buildTSDBFromHead(m.prevHeads); err != nil {
// log the error and start building active head
level.Error(m.log).Log(
"msg", "failed building tsdb from prev head",
"period", m.period.PeriodFor(m.prev.initialized),
"err", err,
)
}
}
return m.buildTSDBFromHead(m.activeHeads)
}
func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error {
@ -270,7 +286,9 @@ func (m *HeadManager) Start() error {
return errors.Wrap(err, "building tsdb")
}
m.metrics.tsdbWALTruncationsTotal.Inc()
if err := os.RemoveAll(managerWalDir(m.dir)); err != nil {
m.metrics.tsdbWALTruncationsFailedTotal.Inc()
return errors.New("cleaning (removing) wal dir")
}
@ -310,13 +328,6 @@ func managerPerTenantDir(parent string) string {
}
func (m *HeadManager) Rotate(t time.Time) (err error) {
defer func() {
m.metrics.tsdbHeadRotationsTotal.Inc()
if err != nil {
m.metrics.tsdbHeadRotationsFailedTotal.Inc()
}
}()
// create new wal
nextWALPath := walPath(m.dir, t)
nextWAL, err := newHeadWAL(m.log, nextWALPath, t)
@ -349,27 +360,40 @@ func (m *HeadManager) Rotate(t time.Time) (err error) {
return nil
}
func (m *HeadManager) buildTSDBFromWAL(t time.Time) error {
level.Debug(m.log).Log("msg", "combining tsdb WALs")
grp, _, err := walsForPeriod(m.dir, m.period, m.period.PeriodFor(t))
func (m *HeadManager) buildTSDBFromHead(head *tenantHeads) error {
period := m.period.PeriodFor(head.start)
if err := m.tsdbManager.BuildFromHead(head); err != nil {
return errors.Wrap(err, "building tsdb from head")
}
// Now that a TSDB has been created from this group, it's safe to remove them
if err := m.truncateWALForPeriod(period); err != nil {
level.Error(m.log).Log(
"msg", "failed truncating wal files",
"period", period,
"err", err,
)
}
return nil
}
func (m *HeadManager) truncateWALForPeriod(period int) (err error) {
m.metrics.tsdbWALTruncationsTotal.Inc()
defer func() {
if err != nil {
m.metrics.tsdbWALTruncationsFailedTotal.Inc()
}
}()
grp, _, err := walsForPeriod(m.dir, m.period, period)
if err != nil {
return errors.Wrap(err, "listing wals")
}
level.Debug(m.log).Log("msg", "listed WALs", "pd", grp.period, "n", len(grp.wals))
// TODO(owen-d): It's probably faster to build this from the *tenantHeads instead,
// but we already need to impl BuildFromWALs to ensure we can correctly build/ship
// TSDBs from orphaned WALs of previous periods during startup.
// we use the same timestamp as wal here for the filename to ensure it can't clobber
// an existing file from a previous cycle. I don't think this is possible, but
// perhaps in some unusual crashlooping it could be, so let's be safe and protect ourselves.
if err := m.tsdbManager.BuildFromWALs(t, grp.wals); err != nil {
return errors.Wrapf(err, "building TSDB from prevHeads WALs for period %d", grp.period)
}
// Now that a TSDB has been created from this group, it's safe to remove them
if err := m.removeWALGroup(grp); err != nil {
return errors.Wrapf(err, "removing prev TSDB WALs for period %d", grp.period)
return errors.Wrapf(err, "removing TSDB WALs for period %d", grp.period)
}
level.Debug(m.log).Log("msg", "removing wals", "pd", grp.period, "n", len(grp.wals))

@ -31,6 +31,10 @@ func newNoopTSDBManager(dir string) noopTSDBManager {
}
}
func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error {
panic("BuildFromHead not implemented")
}
func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier) error {
return recoverHead(m.dir, m.tenantHeads, wals)
}

@ -28,6 +28,8 @@ type TSDBManager interface {
Start() error
// Builds a new TSDB file from a set of WALs
BuildFromWALs(time.Time, []WALIdentifier) error
// Builds a new TSDB file from tenantHeads
BuildFromHead(*tenantHeads) error
}
/*
@ -156,105 +158,123 @@ func (m *tsdbManager) Start() (err error) {
return nil
}
func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) {
level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t)
// get relevant wals
// iterate them, build tsdb in scratch dir
defer func() {
m.metrics.tsdbCreationsTotal.Inc()
if err != nil {
m.metrics.tsdbCreationFailures.Inc()
func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
periods := make(map[string]*Builder)
if err := heads.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
for _, chk := range chks {
idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges)
if err != nil {
return err
}
for _, bucket := range idxBuckets {
pds[bucket] = append(pds[bucket], chk)
}
}
}()
level.Debug(m.log).Log("msg", "recovering tenant heads")
for _, id := range ids {
tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(TenantLabel, user)
withTenant := lb.Labels()
// Add the chunks to all relevant builders
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
periods[pd] = b
}
b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
matchingChks,
)
}
periods := make(map[string]*Builder)
return nil
}); err != nil {
level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB")
return err
}
if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error {
for p, b := range periods {
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: heads.start,
},
dstDir,
"",
)
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
for _, chk := range chks {
idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges)
if err != nil {
return err
}
level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path())
// build+move tsdb to multitenant dir
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
)
if err != nil {
return err
}
for _, bucket := range idxBuckets {
pds[bucket] = append(pds[bucket], chk)
}
}
level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start))
// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(TenantLabel, user)
withTenant := lb.Labels()
// Add the chunks to all relevant builders
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
periods[pd] = b
}
b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
matchingChks,
)
}
loaded, err := NewShippableTSDBFile(dst, false)
if err != nil {
return err
}
return nil
}); err != nil {
level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs")
if err := m.shipper.AddIndex(p, "", loaded); err != nil {
return err
}
}
for p, b := range periods {
return nil
}
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: id.ts,
},
dstDir,
"",
)
func (m *tsdbManager) BuildFromHead(heads *tenantHeads) (err error) {
level.Debug(m.log).Log("msg", "building heads")
defer func() {
m.metrics.tsdbCreationsTotal.WithLabelValues("head").Inc()
if err != nil {
m.metrics.tsdbCreationsFailedTotal.WithLabelValues("head").Inc()
}
}()
level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path())
// build+move tsdb to multitenant dir
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
)
if err != nil {
return err
}
return m.buildFromHead(heads)
}
level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start))
func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) {
level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t)
defer func() {
m.metrics.tsdbCreationsTotal.WithLabelValues("wal").Inc()
if err != nil {
m.metrics.tsdbCreationsFailedTotal.WithLabelValues("wal").Inc()
}
}()
loaded, err := NewShippableTSDBFile(dst, false)
if err != nil {
return err
}
level.Debug(m.log).Log("msg", "recovering tenant heads")
for _, id := range ids {
tmp := newTenantHeads(id.ts, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}
if err := m.shipper.AddIndex(p, "", loaded); err != nil {
return err
}
err := m.buildFromHead(tmp)
if err != nil {
return err
}
}

Loading…
Cancel
Save