|
|
|
|
@ -178,7 +178,6 @@ type HeadOptions struct { |
|
|
|
|
WALReplayConcurrency int |
|
|
|
|
|
|
|
|
|
// EnableSharding enables ShardedPostings() support in the Head.
|
|
|
|
|
// EnableSharding is temporarily disabled during Init().
|
|
|
|
|
EnableSharding bool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -610,7 +609,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second |
|
|
|
|
// Init loads data from the write ahead log and prepares the head for writes.
|
|
|
|
|
// It should be called before using an appender so that it
|
|
|
|
|
// limits the ingested samples to the head min valid time.
|
|
|
|
|
func (h *Head) Init(minValidTime int64) (err error) { |
|
|
|
|
func (h *Head) Init(minValidTime int64) error { |
|
|
|
|
h.minValidTime.Store(minValidTime) |
|
|
|
|
defer func() { |
|
|
|
|
h.postings.EnsureOrder(h.opts.WALReplayConcurrency) |
|
|
|
|
@ -624,24 +623,6 @@ func (h *Head) Init(minValidTime int64) (err error) { |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// If sharding is enabled, disable it while initializing, and calculate the shards later.
|
|
|
|
|
// We're going to use that field for other purposes during WAL replay,
|
|
|
|
|
// so we don't want to waste time on calculating the shard that we're going to lose anyway.
|
|
|
|
|
if h.opts.EnableSharding { |
|
|
|
|
h.opts.EnableSharding = false |
|
|
|
|
defer func() { |
|
|
|
|
h.opts.EnableSharding = true |
|
|
|
|
if err == nil { |
|
|
|
|
// No locking is needed here as nobody should be writing while we're in Init.
|
|
|
|
|
for _, stripe := range h.series.series { |
|
|
|
|
for _, s := range stripe { |
|
|
|
|
s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") |
|
|
|
|
start := time.Now() |
|
|
|
|
|
|
|
|
|
@ -702,6 +683,7 @@ func (h *Head) Init(minValidTime int64) (err error) { |
|
|
|
|
mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk |
|
|
|
|
oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk |
|
|
|
|
lastMmapRef chunks.ChunkDiskMapperRef |
|
|
|
|
err error |
|
|
|
|
|
|
|
|
|
mmapChunkReplayDuration time.Duration |
|
|
|
|
) |
|
|
|
|
@ -2086,11 +2068,9 @@ type memSeries struct { |
|
|
|
|
ref chunks.HeadSeriesRef |
|
|
|
|
meta *metadata.Metadata |
|
|
|
|
|
|
|
|
|
// Series labels hash to use for sharding purposes.
|
|
|
|
|
// The value is always 0 when sharding has not been explicitly enabled in TSDB.
|
|
|
|
|
// While the WAL replay the value stored here is the max time of any mmapped chunk,
|
|
|
|
|
// and the shard hash is re-calculated after WAL replay is complete.
|
|
|
|
|
shardHashOrMemoryMappedMaxTime uint64 |
|
|
|
|
// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
|
|
|
|
|
// been explicitly enabled in TSDB.
|
|
|
|
|
shardHash uint64 |
|
|
|
|
|
|
|
|
|
// Everything after here should only be accessed with the lock held.
|
|
|
|
|
sync.Mutex |
|
|
|
|
@ -2115,6 +2095,8 @@ type memSeries struct { |
|
|
|
|
|
|
|
|
|
ooo *memSeriesOOOFields |
|
|
|
|
|
|
|
|
|
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
|
|
|
|
|
|
|
|
|
|
nextAt int64 // Timestamp at which to cut the next chunk.
|
|
|
|
|
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
|
|
|
|
|
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
|
|
|
|
@ -2145,10 +2127,10 @@ type memSeriesOOOFields struct { |
|
|
|
|
|
|
|
|
|
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { |
|
|
|
|
s := &memSeries{ |
|
|
|
|
lset: lset, |
|
|
|
|
ref: id, |
|
|
|
|
nextAt: math.MinInt64, |
|
|
|
|
shardHashOrMemoryMappedMaxTime: shardHash, |
|
|
|
|
lset: lset, |
|
|
|
|
ref: id, |
|
|
|
|
nextAt: math.MinInt64, |
|
|
|
|
shardHash: shardHash, |
|
|
|
|
} |
|
|
|
|
if !isolationDisabled { |
|
|
|
|
s.txs = newTxRing(0) |
|
|
|
|
@ -2236,12 +2218,6 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD |
|
|
|
|
return removedInOrder + removedOOO |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// shardHash returns the shard hash of the series, only available after WAL replay.
|
|
|
|
|
func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime } |
|
|
|
|
|
|
|
|
|
// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay.
|
|
|
|
|
func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) } |
|
|
|
|
|
|
|
|
|
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
|
|
|
|
// acquiring lock.
|
|
|
|
|
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { |
|
|
|
|
|