|
|
|
|
@ -430,14 +430,15 @@ func (s *shards) runShard(i int) { |
|
|
|
|
pendingSamples := model.Samples{} |
|
|
|
|
|
|
|
|
|
timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
defer func() { |
|
|
|
|
stop := func() { |
|
|
|
|
if !timer.Stop() { |
|
|
|
|
select { |
|
|
|
|
case <-timer.C: |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
defer stop() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
@ -454,16 +455,11 @@ func (s *shards) runShard(i int) { |
|
|
|
|
queueLength.WithLabelValues(s.qm.queueName).Dec() |
|
|
|
|
pendingSamples = append(pendingSamples, sample) |
|
|
|
|
|
|
|
|
|
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { |
|
|
|
|
if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { |
|
|
|
|
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) |
|
|
|
|
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] |
|
|
|
|
|
|
|
|
|
if !timer.Stop() { |
|
|
|
|
select { |
|
|
|
|
case <-timer.C: |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
stop() |
|
|
|
|
timer.Reset(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -472,6 +468,7 @@ func (s *shards) runShard(i int) { |
|
|
|
|
s.sendSamples(pendingSamples) |
|
|
|
|
pendingSamples = pendingSamples[:0] |
|
|
|
|
} |
|
|
|
|
timer.Reset(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|