From 5db578bfe831ffef400a54060ebf39e1f2fd6adb Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 13 Jul 2022 07:22:05 +0530 Subject: [PATCH] fix the issue of overwriting tsdb files during rollouts (#6668) --- pkg/storage/stores/tsdb/manager.go | 142 +++++++++++++++-------------- 1 file changed, 72 insertions(+), 70 deletions(-) diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index 154e1e78cf..a600b840ec 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -168,91 +168,93 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error }() level.Debug(m.log).Log("msg", "recovering tenant heads") - tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log) - if err = recoverHead(m.dir, tmp, ids); err != nil { - return errors.Wrap(err, "building TSDB from WALs") - } + for _, id := range ids { + tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log) + if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil { + return errors.Wrap(err, "building TSDB from WALs") + } - periods := make(map[string]*Builder) + periods := make(map[string]*Builder) - if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error { + if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error { - // chunks may overlap index period bounds, in which case they're written to multiple - pds := make(map[string]index.ChunkMetas) - for _, chk := range chks { - idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges) - if err != nil { - return err - } + // chunks may overlap index period bounds, in which case they're written to multiple + pds := make(map[string]index.ChunkMetas) + for _, chk := range chks { + idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges) + if err != nil { + return err + } - for _, bucket := range idxBuckets { - pds[bucket] = append(pds[bucket], chk) + for _, bucket := range idxBuckets { + pds[bucket] = append(pds[bucket], chk) + } } - } - - // Embed the tenant label into TSDB - lb := labels.NewBuilder(ls) - lb.Set(TenantLabel, user) - withTenant := lb.Labels() - // Add the chunks to all relevant builders - for pd, matchingChks := range pds { - b, ok := periods[pd] - if !ok { - b = NewBuilder() - periods[pd] = b + // Embed the tenant label into TSDB + lb := labels.NewBuilder(ls) + lb.Set(TenantLabel, user) + withTenant := lb.Labels() + + // Add the chunks to all relevant builders + for pd, matchingChks := range pds { + b, ok := periods[pd] + if !ok { + b = NewBuilder() + periods[pd] = b + } + + b.AddSeries( + withTenant, + // use the fingerprint without the added tenant label + // so queries route to the chunks which actually exist. + model.Fingerprint(ls.Hash()), + matchingChks, + ) } - b.AddSeries( - withTenant, - // use the fingerprint without the added tenant label - // so queries route to the chunks which actually exist. - model.Fingerprint(ls.Hash()), - matchingChks, - ) + return nil + }); err != nil { + level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs") + return err } - return nil - }); err != nil { - level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs") - return err - } - - for p, b := range periods { + for p, b := range periods { - dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) - dst := newPrefixedIdentifier( - MultitenantTSDBIdentifier{ - nodeName: m.nodeName, - ts: t, - }, - dstDir, - "", - ) + dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p)) + dst := newPrefixedIdentifier( + MultitenantTSDBIdentifier{ + nodeName: m.nodeName, + ts: id.ts, + }, + dstDir, + "", + ) - level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path()) - // build+move tsdb to multitenant dir - start := time.Now() - _, err = b.Build( - context.Background(), - managerScratchDir(m.dir), - func(from, through model.Time, checksum uint32) Identifier { - return dst - }, - ) - if err != nil { - return err - } + level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path()) + // build+move tsdb to multitenant dir + start := time.Now() + _, err = b.Build( + context.Background(), + managerScratchDir(m.dir), + func(from, through model.Time, checksum uint32) Identifier { + return dst + }, + ) + if err != nil { + return err + } - level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) + level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start)) - loaded, err := NewShippableTSDBFile(dst, false) - if err != nil { - return err - } + loaded, err := NewShippableTSDBFile(dst, false) + if err != nil { + return err + } - if err := m.shipper.AddIndex(p, "", loaded); err != nil { - return err + if err := m.shipper.AddIndex(p, "", loaded); err != nil { + return err + } } }