fix the issue of overwriting tsdb files during rollouts (#6668)

chaudum/compactor-list-objects
Sandeep Sukhani 4 years ago committed by GitHub
parent c170364c2f
commit 5db578bfe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 142
      pkg/storage/stores/tsdb/manager.go

@ -168,91 +168,93 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error
}()
level.Debug(m.log).Log("msg", "recovering tenant heads")
tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, ids); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}
for _, id := range ids {
tmp := newTenantHeads(t, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}
periods := make(map[string]*Builder)
periods := make(map[string]*Builder)
if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error {
if err := tmp.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
for _, chk := range chks {
idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges)
if err != nil {
return err
}
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
for _, chk := range chks {
idxBuckets, err := indexBuckets(chk.From(), chk.Through(), m.tableRanges)
if err != nil {
return err
}
for _, bucket := range idxBuckets {
pds[bucket] = append(pds[bucket], chk)
for _, bucket := range idxBuckets {
pds[bucket] = append(pds[bucket], chk)
}
}
}
// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(TenantLabel, user)
withTenant := lb.Labels()
// Add the chunks to all relevant builders
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
periods[pd] = b
// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(TenantLabel, user)
withTenant := lb.Labels()
// Add the chunks to all relevant builders
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
periods[pd] = b
}
b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
matchingChks,
)
}
b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
matchingChks,
)
return nil
}); err != nil {
level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs")
return err
}
return nil
}); err != nil {
level.Error(m.log).Log("err", err.Error(), "msg", "building TSDB from WALs")
return err
}
for p, b := range periods {
for p, b := range periods {
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: t,
},
dstDir,
"",
)
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := newPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: id.ts,
},
dstDir,
"",
)
level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path())
// build+move tsdb to multitenant dir
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
)
if err != nil {
return err
}
level.Debug(m.log).Log("msg", "building tsdb for period", "pd", p, "dst", dst.Path())
// build+move tsdb to multitenant dir
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
)
if err != nil {
return err
}
level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start))
level.Debug(m.log).Log("msg", "finished building tsdb for period", "pd", p, "dst", dst.Path(), "duration", time.Since(start))
loaded, err := NewShippableTSDBFile(dst, false)
if err != nil {
return err
}
loaded, err := NewShippableTSDBFile(dst, false)
if err != nil {
return err
}
if err := m.shipper.AddIndex(p, "", loaded); err != nil {
return err
if err := m.shipper.AddIndex(p, "", loaded); err != nil {
return err
}
}
}

Loading…
Cancel
Save