refactor: use the built-in max/min to simplify the code

Signed-off-by: carrychair <linghuchong404@gmail.com>
pull/16617/head
carrychair 1 month ago
parent 6c930e8506
commit e83dc66bdb
  1. 5
      tsdb/agent/db.go
  2. 5
      tsdb/chunks/chunk_write_queue.go
  3. 5
      tsdb/db.go
  4. 5
      tsdb/exemplar.go
  5. 6
      tsdb/head_read.go
  6. 30
      tsdb/head_wal.go
  7. 5
      tsdb/tombstones/tombstones.go

@ -613,10 +613,7 @@ Loop:
// //
// Subtracting a duration from ts will add a buffer for when series are // Subtracting a duration from ts will add a buffer for when series are
// considered inactive and safe for deletion. // considered inactive and safe for deletion.
ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime ts := max(db.rs.LowestSentTimestamp()-db.opts.MinWALTime, 0)
if ts < 0 {
ts = 0
}
// Network issues can prevent the result of getRemoteWriteTimestamp from // Network issues can prevent the result of getRemoteWriteTimestamp from
// changing. We don't want data in the WAL to grow forever, so we set a cap // changing. We don't want data in the WAL to grow forever, so we set a cap

@ -88,10 +88,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
[]string{"operation"}, []string{"operation"},
) )
segmentSize := size segmentSize := min(size, maxChunkQueueSegmentSize)
if segmentSize > maxChunkQueueSegmentSize {
segmentSize = maxChunkQueueSegmentSize
}
q := &chunkWriteQueue{ q := &chunkWriteQueue{
jobs: newWriteJobQueue(size, segmentSize), jobs: newWriteJobQueue(size, segmentSize),

@ -979,10 +979,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
// Register metrics after assigning the head block. // Register metrics after assigning the head block.
db.metrics = newDBMetrics(db, r) db.metrics = newDBMetrics(db, r)
maxBytes := opts.MaxBytes maxBytes := max(opts.MaxBytes, 0)
if maxBytes < 0 {
maxBytes = 0
}
db.metrics.maxBytes.Set(float64(maxBytes)) db.metrics.maxBytes.Set(float64(maxBytes))
db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds()) db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds())

@ -296,10 +296,7 @@ func (ce *CircularExemplarStorage) Resize(l int64) int {
ce.nextIndex = 0 ce.nextIndex = 0
// Replay as many entries as needed, starting with oldest first. // Replay as many entries as needed, starting with oldest first.
count := int64(len(oldBuffer)) count := min(l, int64(len(oldBuffer)))
if l < count {
count = l
}
migrated := 0 migrated := 0

@ -568,10 +568,8 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *
continue continue
} }
} }
stopAfter = numSamples - (appendIDsToConsider - index) // Stopped in a previous chunk.
if stopAfter < 0 { stopAfter = max(numSamples-(appendIDsToConsider-index), 0)
stopAfter = 0 // Stopped in a previous chunk.
}
break break
} }
} }

@ -281,10 +281,7 @@ Outer:
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if shards[i] == nil { if shards[i] == nil {
shards[i] = processors[i].reuseBuf() shards[i] = processors[i].reuseBuf()
@ -346,10 +343,7 @@ Outer:
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()
@ -382,10 +376,7 @@ Outer:
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()
@ -813,10 +804,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if shards[i] == nil { if shards[i] == nil {
shards[i] = processors[i].reuseBuf() shards[i] = processors[i].reuseBuf()
@ -869,10 +857,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()
@ -901,10 +886,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
// cause thousands of very large in flight buffers occupying large amounts // cause thousands of very large in flight buffers occupying large amounts
// of unused memory. // of unused memory.
for len(samples) > 0 { for len(samples) > 0 {
m := 5000 m := min(len(samples), 5000)
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
if histogramShards[i] == nil { if histogramShards[i] == nil {
histogramShards[i] = processors[i].reuseHistogramBuf() histogramShards[i] = processors[i].reuseHistogramBuf()

@ -377,9 +377,6 @@ func (in Intervals) Add(n Interval) Intervals {
if n.Mint < in[mini].Mint { if n.Mint < in[mini].Mint {
in[mini].Mint = n.Mint in[mini].Mint = n.Mint
} }
in[mini].Maxt = in[maxi+mini-1].Maxt in[mini].Maxt = max(n.Maxt, in[maxi+mini-1].Maxt)
if n.Maxt > in[mini].Maxt {
in[mini].Maxt = n.Maxt
}
return append(in[:mini+1], in[maxi+mini:]...) return append(in[:mini+1], in[maxi+mini:]...)
} }

Loading…
Cancel
Save