|
|
|
|
@ -16,6 +16,7 @@ package remote |
|
|
|
|
import ( |
|
|
|
|
"math" |
|
|
|
|
"sync" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"golang.org/x/time/rate" |
|
|
|
|
@ -372,10 +373,10 @@ func (t *QueueManager) reshard(n int) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type shards struct { |
|
|
|
|
qm *QueueManager |
|
|
|
|
queues []chan *model.Sample |
|
|
|
|
done chan struct{} |
|
|
|
|
wg sync.WaitGroup |
|
|
|
|
qm *QueueManager |
|
|
|
|
queues []chan *model.Sample |
|
|
|
|
done chan struct{} |
|
|
|
|
running int32 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (t *QueueManager) newShards(numShards int) *shards { |
|
|
|
|
@ -384,11 +385,11 @@ func (t *QueueManager) newShards(numShards int) *shards { |
|
|
|
|
queues[i] = make(chan *model.Sample, t.cfg.Capacity) |
|
|
|
|
} |
|
|
|
|
s := &shards{ |
|
|
|
|
qm: t, |
|
|
|
|
queues: queues, |
|
|
|
|
done: make(chan struct{}), |
|
|
|
|
qm: t, |
|
|
|
|
queues: queues, |
|
|
|
|
done: make(chan struct{}), |
|
|
|
|
running: int32(numShards), |
|
|
|
|
} |
|
|
|
|
s.wg.Add(numShards) |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -407,14 +408,8 @@ func (s *shards) stop() { |
|
|
|
|
close(shard) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
done := make(chan struct{}) |
|
|
|
|
go func() { |
|
|
|
|
s.wg.Wait() |
|
|
|
|
close(done) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case <-done: |
|
|
|
|
case <-s.done: |
|
|
|
|
case <-time.After(stopFlushDeadline): |
|
|
|
|
level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") |
|
|
|
|
} |
|
|
|
|
@ -435,7 +430,12 @@ func (s *shards) enqueue(sample *model.Sample) bool { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *shards) runShard(i int) { |
|
|
|
|
defer s.wg.Done() |
|
|
|
|
defer func() { |
|
|
|
|
if atomic.AddInt32(&s.running, -1) == 0 { |
|
|
|
|
close(s.done) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
queue := s.queues[i] |
|
|
|
|
|
|
|
|
|
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
|
|
|
|
|
|