|
|
|
|
@ -430,6 +430,14 @@ func (s *shards) runShard(i int) { |
|
|
|
|
pendingSamples := model.Samples{} |
|
|
|
|
|
|
|
|
|
timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
defer func() { |
|
|
|
|
if !timer.Stop() { |
|
|
|
|
select { |
|
|
|
|
case <-timer.C: |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
@ -449,11 +457,16 @@ func (s *shards) runShard(i int) { |
|
|
|
|
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { |
|
|
|
|
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) |
|
|
|
|
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] |
|
|
|
|
|
|
|
|
|
if !timer.Stop() { |
|
|
|
|
select { |
|
|
|
|
case <-timer.C: |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
timer.Reset(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
} |
|
|
|
|
if !timer.Stop() { |
|
|
|
|
<-timer.C |
|
|
|
|
} |
|
|
|
|
timer.Reset(s.qm.cfg.BatchSendDeadline) |
|
|
|
|
|
|
|
|
|
case <-timer.C: |
|
|
|
|
if len(pendingSamples) > 0 { |
|
|
|
|
s.sendSamples(pendingSamples) |
|
|
|
|
|