TSDB explicit fingerprint (#7362)

This PR does two things:
* Includes the series fingerprint in TSDB WALs. Doing this ensures we
recover fingerprint used in chunk addresses after restart even if loki
is upgraded and the label hashing algorithm changes.
* Removes much of the extra hashing in the TSDB write path + manager.
Occasionally, the fingerprint may be different than the hash of labels.
Historically this has happened in a few places:
* When the `__name__="logs"` label was injected for non-tsdb indices
(not applicable to TSDB)
* When building a multitenant tsdb index and adding a synthetic
`__loki_tenant__` label
* In a previous bug(https://github.com/grafana/loki/pull/7355) when
empty label values altered calculated fingerprints.

Explicitly storing/retrieving the desired fingerprint throughout helps
avoid calculating it in error.
pull/7371/head
Owen Diehl 3 years ago committed by GitHub
parent bc7f2f5adb
commit f4e2cf1993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/storage/stores/tsdb/head.go
  2. 32
      pkg/storage/stores/tsdb/head_manager.go
  3. 33
      pkg/storage/stores/tsdb/head_manager_test.go
  4. 41
      pkg/storage/stores/tsdb/head_wal.go
  5. 17
      pkg/storage/stores/tsdb/head_wal_test.go
  6. 4
      pkg/storage/stores/tsdb/manager.go
  7. 2
      pkg/storage/stores/tsdb/single_file_index_test.go
  8. 6
      pkg/storage/stores/tsdb/store.go

@ -159,12 +159,12 @@ func updateMintMaxt(mint, maxt int64, mintSrc, maxtSrc *atomic.Int64) {
}
// Note: chks must not be nil or zero-length
func (h *Head) Append(ls labels.Labels, chks index.ChunkMetas) (created bool, refID uint64) {
func (h *Head) Append(ls labels.Labels, fprint uint64, chks index.ChunkMetas) (created bool, refID uint64) {
from, through := chks.Bounds()
var id uint64
created, refID = h.series.Append(ls, chks, func() *memSeries {
id = h.lastSeriesID.Inc()
return newMemSeries(id, ls)
return newMemSeries(id, ls, fprint)
})
updateMintMaxt(int64(from), int64(through), &h.minTime, &h.maxTime)
@ -292,10 +292,10 @@ type memSeries struct {
chks index.ChunkMetas
}
func newMemSeries(ref uint64, ls labels.Labels) *memSeries {
func newMemSeries(ref uint64, ls labels.Labels, fp uint64) *memSeries {
return &memSeries{
ref: ref,
ls: ls,
fp: ls.Hash(),
fp: fp,
}
}

@ -232,7 +232,7 @@ func (m *HeadManager) Stop() error {
return m.buildTSDBFromHead(m.activeHeads)
}
func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMetas) error {
func (m *HeadManager) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) error {
// TSDB doesnt need the __name__="log" convention the old chunk store index used.
// We must create a copy of the labels here to avoid mutating the existing
// labels when writing across index buckets.
@ -243,7 +243,7 @@ func (m *HeadManager) Append(userID string, ls labels.Labels, chks index.ChunkMe
m.mtx.RLock()
defer m.mtx.RUnlock()
rec := m.activeHeads.Append(userID, ls, chks)
rec := m.activeHeads.Append(userID, ls, fprint, chks)
return m.active.Log(rec)
}
@ -497,7 +497,11 @@ func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error {
// map of users -> ref -> series.
// Keep track of which ref corresponds to which series
// for each WAL so we replay into the correct series
seriesMap := make(map[string]map[uint64]labels.Labels)
type labelsWithFp struct {
ls labels.Labels
fp uint64
}
seriesMap := make(map[string]map[uint64]*labelsWithFp)
for reader.Next() {
rec := &WALRecord{}
@ -509,10 +513,13 @@ func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error {
if len(rec.Series.Labels) > 0 {
tenant, ok := seriesMap[rec.UserID]
if !ok {
tenant = make(map[uint64]labels.Labels)
tenant = make(map[uint64]*labelsWithFp)
seriesMap[rec.UserID] = tenant
}
tenant[uint64(rec.Series.Ref)] = rec.Series.Labels
tenant[uint64(rec.Series.Ref)] = &labelsWithFp{
ls: rec.Series.Labels,
fp: rec.Fingerprint,
}
}
if len(rec.Chks.Chks) > 0 {
@ -520,11 +527,11 @@ func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error {
if !ok {
return errors.New("found tsdb chunk metas without user in WAL replay")
}
ls, ok := tenant[rec.Chks.Ref]
x, ok := tenant[rec.Chks.Ref]
if !ok {
return errors.New("found tsdb chunk metas without series in WAL replay")
}
_ = heads.Append(rec.UserID, ls, rec.Chks.Chks)
_ = heads.Append(rec.UserID, x.ls, x.fp, rec.Chks.Chks)
}
}
return reader.Err()
@ -581,7 +588,7 @@ func newTenantHeads(start time.Time, shards int, metrics *Metrics, logger log.Lo
return res
}
func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMetas) *WALRecord {
func (t *tenantHeads) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) *WALRecord {
var mint, maxt int64
for _, chk := range chks {
if chk.MinTime < mint || mint == 0 {
@ -595,7 +602,7 @@ func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMe
updateMintMaxt(mint, maxt, &t.mint, &t.maxt)
head := t.getOrCreateTenantHead(userID)
newStream, refID := head.Append(ls, chks)
newStream, refID := head.Append(ls, fprint, chks)
rec := &WALRecord{
UserID: userID,
@ -606,6 +613,7 @@ func (t *tenantHeads) Append(userID string, ls labels.Labels, chks index.ChunkMe
}
if newStream {
rec.Fingerprint = fprint
rec.Series = record.RefSeries{
Ref: chunks.HeadSeriesRef(refID),
Labels: ls,
@ -718,7 +726,7 @@ func (t *tenantHeads) Stats(ctx context.Context, userID string, from, through mo
}
// helper only used in building TSDBs
func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, chks index.ChunkMetas) error) error {
func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error) error {
for i, shard := range t.tenants {
t.locks[i].RLock()
defer t.locks[i].RUnlock()
@ -736,13 +744,13 @@ func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, chks index.C
chks []index.ChunkMeta
)
_, err := idx.Series(ps.At(), &ls, &chks)
fp, err := idx.Series(ps.At(), &ls, &chks)
if err != nil {
return errors.Wrapf(err, "iterating postings for tenant: %s", user)
}
if err := fn(user, ls, chks); err != nil {
if err := fn(user, ls, fp, chks); err != nil {
return err
}
}

@ -66,7 +66,7 @@ func Test_TenantHeads_Append(t *testing.T) {
Entries: 30,
},
}
_ = h.Append("fake", ls, chks)
_ = h.Append("fake", ls, ls.Hash(), chks)
found, err := h.GetChunkRefs(
context.Background(),
@ -117,7 +117,7 @@ func Test_TenantHeads_MultiRead(t *testing.T) {
// add data for both tenants
for _, tenant := range tenants {
_ = h.Append(tenant.user, tenant.ls, chks)
_ = h.Append(tenant.user, tenant.ls, tenant.ls.Hash(), chks)
}
@ -142,13 +142,15 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
now := time.Now()
dir := t.TempDir()
cases := []struct {
Labels labels.Labels
Chunks []index.ChunkMeta
User string
Labels labels.Labels
Fingerprint uint64
Chunks []index.ChunkMeta
User string
}{
{
User: "tenant1",
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
User: "tenant1",
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(),
Chunks: []index.ChunkMeta{
{
MinTime: 1,
@ -158,8 +160,9 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
},
},
{
User: "tenant2",
Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`),
User: "tenant2",
Labels: mustParseLabels(`{foo="bard", bazz="bozz", bonk="borb"}`),
Fingerprint: 1, // Different fingerprint should be preserved
Chunks: []index.ChunkMeta{
{
MinTime: 1,
@ -186,7 +189,8 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
for i, c := range cases {
require.Nil(t, w.Log(&WALRecord{
UserID: c.User,
UserID: c.User,
Fingerprint: c.Fingerprint,
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.Labels,
@ -215,7 +219,7 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Labels.Hash(), c.Chunks), refs)
require.Equal(t, chunkMetasToChunkRefs(c.User, c.Fingerprint, c.Chunks), refs)
}
}
@ -260,7 +264,8 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
// Write old WALs
for i, c := range cases {
require.Nil(t, w.Log(&WALRecord{
UserID: c.User,
UserID: c.User,
Fingerprint: c.Labels.Hash(),
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.Labels,
@ -313,7 +318,7 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
},
}
require.Nil(t, mgr.Append(newCase.User, newCase.Labels, newCase.Chunks))
require.Nil(t, mgr.Append(newCase.User, newCase.Labels, newCase.Labels.Hash(), newCase.Chunks))
// Ensure old + new data is queryable
for _, c := range append(cases, newCase) {
@ -353,7 +358,7 @@ func BenchmarkTenantHeads(b *testing.B) {
for i := 0; i < 1000; i++ {
tenant := i % nTenants
ls := mustParseLabels(fmt.Sprintf(`{foo="bar", i="%d"}`, i))
heads.Append(fmt.Sprint(tenant), ls, index.ChunkMetas{
heads.Append(fmt.Sprint(tenant), ls, ls.Hash(), index.ChunkMetas{
{},
})
}

@ -38,12 +38,14 @@ const (
// WALs and persists across restarts.
WalRecordSeries RecordType = iota
WalRecordChunks
WalRecordSeriesWithFingerprint
)
type WALRecord struct {
UserID string
Series record.RefSeries
Chks ChunkMetasRecord
UserID string
Series record.RefSeries
Fingerprint uint64
Chks ChunkMetasRecord
}
type ChunkMetasRecord struct {
@ -51,6 +53,8 @@ type ChunkMetasRecord struct {
Ref uint64
}
// NB(owen-d): unused since we started recording the fingerprint
// with the series, but left here for future understanding
func (r *WALRecord) encodeSeries(b []byte) []byte {
buf := encoding.EncWith(b)
buf.PutByte(byte(WalRecordSeries))
@ -65,6 +69,21 @@ func (r *WALRecord) encodeSeries(b []byte) []byte {
return encoded
}
func (r *WALRecord) encodeSeriesWithFingerprint(b []byte) []byte {
buf := encoding.EncWith(b)
buf.PutByte(byte(WalRecordSeriesWithFingerprint))
buf.PutUvarintStr(r.UserID)
buf.PutBE64(r.Fingerprint)
var enc record.Encoder
// The 'encoded' already has the type header and userID here, hence re-using
// the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series.
encoded := buf.Get()
encoded = append(encoded, enc.Series([]record.RefSeries{r.Series}, encoded[len(encoded):])...)
return encoded
}
func (r *WALRecord) encodeChunks(b []byte) []byte {
buf := encoding.EncWith(b)
buf.PutByte(byte(WalRecordChunks))
@ -142,6 +161,20 @@ func decodeWALRecord(b []byte, walRec *WALRecord) error {
if len(rSeries) == 1 {
walRec.Series = rSeries[0]
}
case WalRecordSeriesWithFingerprint:
userID = decbuf.UvarintStr()
walRec.Fingerprint = decbuf.Be64()
rSeries, err := dec.Series(decbuf.B, nil)
if err != nil {
return errors.Wrap(err, "decoding head series")
}
// unlike tsdb, we only add one series per record.
if len(rSeries) > 1 {
return errors.New("more than one series detected in tsdb head wal record")
}
if len(rSeries) == 1 {
walRec.Series = rSeries[0]
}
case WalRecordChunks:
userID = decbuf.UvarintStr()
if err := decodeChunks(decbuf.B, walRec); err != nil {
@ -196,7 +229,7 @@ func (w *headWAL) Log(record *WALRecord) error {
// Always write series before chunks
if len(record.Series.Labels) > 0 {
buf = record.encodeSeries(buf[:0])
buf = record.encodeSeriesWithFingerprint(buf[:0])
if err := w.wal.Log(buf); err != nil {
return err
}

@ -28,6 +28,23 @@ func Test_Encoding_Series(t *testing.T) {
require.Equal(t, record, decoded)
}
func Test_Encoding_SeriesWithFingerprint(t *testing.T) {
record := &WALRecord{
UserID: "foo",
Fingerprint: mustParseLabels(`{foo="bar"}`).Hash(),
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(1),
Labels: mustParseLabels(`{foo="bar"}`),
},
}
buf := record.encodeSeriesWithFingerprint(nil)
decoded := &WALRecord{}
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, decoded)
}
func Test_Encoding_Chunks(t *testing.T) {
record := &WALRecord{
UserID: "foo",

@ -159,7 +159,7 @@ func (m *tsdbManager) Start() (err error) {
func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
periods := make(map[string]*Builder)
if err := heads.forAll(func(user string, ls labels.Labels, chks index.ChunkMetas) error {
if err := heads.forAll(func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
@ -188,7 +188,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(ls.Hash()),
model.Fingerprint(fp),
matchingChks,
)
}

@ -72,7 +72,7 @@ func TestSingleIdx(t *testing.T) {
fn: func() Index {
head := NewHead("fake", NewMetrics(nil), log.NewNopLogger())
for _, x := range cases {
_, _ = head.Append(x.Labels, x.Chunks)
_, _ = head.Append(x.Labels, x.Labels.Hash(), x.Chunks)
}
reader := head.Index()
return NewTSDBIndex(reader)

@ -23,7 +23,7 @@ import (
)
type IndexWriter interface {
Append(userID string, ls labels.Labels, chks tsdb_index.ChunkMetas) error
Append(userID string, ls labels.Labels, fprint uint64, chks tsdb_index.ChunkMetas) error
}
type store struct {
@ -198,7 +198,7 @@ func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
Entries: uint32(chk.Data.Entries()),
},
}
if err := s.indexWriter.Append(chk.UserID, chk.Metric, metas); err != nil {
if err := s.indexWriter.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, metas); err != nil {
return errors.Wrap(err, "writing index entry")
}
@ -207,7 +207,7 @@ func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
type failingIndexWriter struct{}
func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ tsdb_index.ChunkMetas) error {
func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdb_index.ChunkMetas) error {
return fmt.Errorf("index writer is not initialized due to tsdb store being initialized in read-only mode")
}

Loading…
Cancel
Save