Merge pull request #17351 from bboreham/simplify-precreate

TSDB: Allocate series ID after seriesLifecycleCallback; simplify code.
pull/17509/head
Jan Fajerski 1 month ago committed by GitHub
commit 49254f45e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 61
      tsdb/head.go
  2. 12
      tsdb/head_test.go
  3. 4
      tsdb/head_wal.go

@ -1738,32 +1738,31 @@ func (*Head) String() string {
}
func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
// a new series on every sample inserted via Add(), which causes allocations
// and makes our series IDs rather random and harder to compress in postings.
s := h.series.getByHash(hash, lset)
if s != nil {
return s, false, nil
}
// Optimistically assume that we are the first one to create the series.
id := chunks.HeadSeriesRef(h.lastSeriesID.Inc())
return h.getOrCreateWithID(id, hash, lset, pendingCommit)
return h.getOrCreateWithOptionalID(0, hash, lset, pendingCommit)
}
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
shardHash := uint64(0)
if h.opts.EnableSharding {
shardHash = labels.StableHash(lset)
}
// If id is zero, one will be allocated.
func (h *Head) getOrCreateWithOptionalID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) {
if preCreationErr := h.series.seriesLifecycleCallback.PreCreation(lset); preCreationErr != nil {
return nil, false, preCreationErr
}
if id == 0 {
// Note this id is wasted in the case where a concurrent operation creates the same series first.
id = chunks.HeadSeriesRef(h.lastSeriesID.Inc())
}
return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit)
})
if err != nil {
return nil, false, err
shardHash := uint64(0)
if h.opts.EnableSharding {
shardHash = labels.StableHash(lset)
}
optimisticallyCreatedSeries := newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit)
s, created := h.series.setUnlessAlreadySet(hash, lset, optimisticallyCreatedSeries)
if !created {
return s, false, nil
}
@ -2061,43 +2060,23 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
return series
}
func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
// PreCreation is called here to avoid calling it inside the lock.
// It is not necessary to call it just before creating a series,
// rather it gives a 'hint' whether to create a series or not.
preCreationErr := s.seriesLifecycleCallback.PreCreation(lset)
// Create the series, unless the PreCreation() callback as failed.
// If failed, we'll not allow to create a new series anyway.
var series *memSeries
if preCreationErr == nil {
series = createSeries()
}
func (s *stripeSeries) setUnlessAlreadySet(hash uint64, lset labels.Labels, series *memSeries) (*memSeries, bool) {
i := hash & uint64(s.size-1)
s.locks[i].Lock()
if prev := s.hashes[i].get(hash, lset); prev != nil {
s.locks[i].Unlock()
return prev, false, nil
}
if preCreationErr == nil {
s.hashes[i].set(hash, series)
return prev, false
}
s.hashes[i].set(hash, series)
s.locks[i].Unlock()
if preCreationErr != nil {
// The callback prevented creation of series.
return nil, false, preCreationErr
}
i = uint64(series.ref) & uint64(s.size-1)
s.locks[i].Lock()
s.series[i][series.ref] = series
s.locks[i].Unlock()
return series, true, nil
return series, true
}
func (s *stripeSeries) postCreation(lset labels.Labels) {

@ -1152,7 +1152,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
{
name: "keep series still in the head",
prepare: func(t *testing.T, h *Head) {
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false)
_, _, err := h.getOrCreateWithOptionalID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false)
require.NoError(t, err)
},
expected: true,
@ -6627,18 +6627,12 @@ func stripeSeriesWithCollidingSeries(t *testing.T) (*stripeSeries, *memSeries, *
hash := lbls1.Hash()
s := newStripeSeries(1, noopSeriesLifecycleCallback{})
got, created, err := s.getOrSet(hash, lbls1, func() *memSeries {
return &ms1
})
require.NoError(t, err)
got, created := s.setUnlessAlreadySet(hash, lbls1, &ms1)
require.True(t, created)
require.Same(t, &ms1, got)
// Add a conflicting series
got, created, err = s.getOrSet(hash, lbls2, func() *memSeries {
return &ms2
})
require.NoError(t, err)
got, created = s.setUnlessAlreadySet(hash, lbls2, &ms2)
require.True(t, created)
require.Same(t, &ms2, got)

@ -255,7 +255,7 @@ Outer:
switch v := d.(type) {
case []record.RefSeries:
for _, walSeries := range v {
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false)
mSeries, created, err := h.getOrCreateWithOptionalID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false)
if err != nil {
seriesCreationErr = err
break Outer
@ -1590,7 +1590,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
localRefSeries := shardedRefSeries[idx]
for csr := range rc {
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false)
series, _, err := h.getOrCreateWithOptionalID(csr.ref, csr.lset.Hash(), csr.lset, false)
if err != nil {
errChan <- err
return

Loading…
Cancel
Save