diff --git a/tsdb/head.go b/tsdb/head.go index 4e77314b02..2c71977b1a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1738,32 +1738,31 @@ func (*Head) String() string { } func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { - // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create - // a new series on every sample inserted via Add(), which causes allocations - // and makes our series IDs rather random and harder to compress in postings. s := h.series.getByHash(hash, lset) if s != nil { return s, false, nil } - // Optimistically assume that we are the first one to create the series. - id := chunks.HeadSeriesRef(h.lastSeriesID.Inc()) - - return h.getOrCreateWithID(id, hash, lset, pendingCommit) + return h.getOrCreateWithOptionalID(0, hash, lset, pendingCommit) } -func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { - s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - shardHash := uint64(0) - if h.opts.EnableSharding { - shardHash = labels.StableHash(lset) - } +// If id is zero, one will be allocated. +func (h *Head) getOrCreateWithOptionalID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { + if preCreationErr := h.series.seriesLifecycleCallback.PreCreation(lset); preCreationErr != nil { + return nil, false, preCreationErr + } + if id == 0 { + // Note this id is wasted in the case where a concurrent operation creates the same series first. + id = chunks.HeadSeriesRef(h.lastSeriesID.Inc()) + } - return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit) - }) - if err != nil { - return nil, false, err + shardHash := uint64(0) + if h.opts.EnableSharding { + shardHash = labels.StableHash(lset) } + optimisticallyCreatedSeries := newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit) + + s, created := h.series.setUnlessAlreadySet(hash, lset, optimisticallyCreatedSeries) if !created { return s, false, nil } @@ -2061,43 +2060,23 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { return series } -func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) { - // PreCreation is called here to avoid calling it inside the lock. - // It is not necessary to call it just before creating a series, - // rather it gives a 'hint' whether to create a series or not. - preCreationErr := s.seriesLifecycleCallback.PreCreation(lset) - - // Create the series, unless the PreCreation() callback as failed. - // If failed, we'll not allow to create a new series anyway. - var series *memSeries - if preCreationErr == nil { - series = createSeries() - } - +func (s *stripeSeries) setUnlessAlreadySet(hash uint64, lset labels.Labels, series *memSeries) (*memSeries, bool) { i := hash & uint64(s.size-1) s.locks[i].Lock() - if prev := s.hashes[i].get(hash, lset); prev != nil { s.locks[i].Unlock() - return prev, false, nil - } - if preCreationErr == nil { - s.hashes[i].set(hash, series) + return prev, false } + s.hashes[i].set(hash, series) s.locks[i].Unlock() - if preCreationErr != nil { - // The callback prevented creation of series. - return nil, false, preCreationErr - } - i = uint64(series.ref) & uint64(s.size-1) s.locks[i].Lock() s.series[i][series.ref] = series s.locks[i].Unlock() - return series, true, nil + return series, true } func (s *stripeSeries) postCreation(lset labels.Labels) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 29d9ad8cbd..63ec9739c4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1152,7 +1152,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { { name: "keep series still in the head", prepare: func(t *testing.T, h *Head) { - _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) + _, _, err := h.getOrCreateWithOptionalID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) require.NoError(t, err) }, expected: true, @@ -6627,18 +6627,12 @@ func stripeSeriesWithCollidingSeries(t *testing.T) (*stripeSeries, *memSeries, * hash := lbls1.Hash() s := newStripeSeries(1, noopSeriesLifecycleCallback{}) - got, created, err := s.getOrSet(hash, lbls1, func() *memSeries { - return &ms1 - }) - require.NoError(t, err) + got, created := s.setUnlessAlreadySet(hash, lbls1, &ms1) require.True(t, created) require.Same(t, &ms1, got) // Add a conflicting series - got, created, err = s.getOrSet(hash, lbls2, func() *memSeries { - return &ms2 - }) - require.NoError(t, err) + got, created = s.setUnlessAlreadySet(hash, lbls2, &ms2) require.True(t, created) require.Same(t, &ms2, got) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index eed68125d4..3c9aa7980e 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -255,7 +255,7 @@ Outer: switch v := d.(type) { case []record.RefSeries: for _, walSeries := range v { - mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false) + mSeries, created, err := h.getOrCreateWithOptionalID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false) if err != nil { seriesCreationErr = err break Outer @@ -1590,7 +1590,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie localRefSeries := shardedRefSeries[idx] for csr := range rc { - series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false) + series, _, err := h.getOrCreateWithOptionalID(csr.ref, csr.lset.Hash(), csr.lset, false) if err != nil { errChan <- err return