|
|
|
|
@ -143,6 +143,33 @@ var ( |
|
|
|
|
}, |
|
|
|
|
[]string{queue}, |
|
|
|
|
) |
|
|
|
|
maxNumShards = promauto.NewGaugeVec( |
|
|
|
|
prometheus.GaugeOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
|
Name: "shards_max", |
|
|
|
|
Help: "The maximum number of shards that the queue is allowed to run.", |
|
|
|
|
}, |
|
|
|
|
[]string{queue}, |
|
|
|
|
) |
|
|
|
|
minNumShards = promauto.NewGaugeVec( |
|
|
|
|
prometheus.GaugeOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
|
Name: "shards_min", |
|
|
|
|
Help: "The minimum number of shards that the queue is allowed to run.", |
|
|
|
|
}, |
|
|
|
|
[]string{queue}, |
|
|
|
|
) |
|
|
|
|
desiredNumShards = promauto.NewGaugeVec( |
|
|
|
|
prometheus.GaugeOpts{ |
|
|
|
|
Namespace: namespace, |
|
|
|
|
Subsystem: subsystem, |
|
|
|
|
Name: "shards_desired", |
|
|
|
|
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", |
|
|
|
|
}, |
|
|
|
|
[]string{queue}, |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// StorageClient defines an interface for sending a batch of samples to an
|
|
|
|
|
@ -189,6 +216,9 @@ type QueueManager struct { |
|
|
|
|
succeededSamplesTotal prometheus.Counter |
|
|
|
|
retriedSamplesTotal prometheus.Counter |
|
|
|
|
shardCapacity prometheus.Gauge |
|
|
|
|
maxNumShards prometheus.Gauge |
|
|
|
|
minNumShards prometheus.Gauge |
|
|
|
|
desiredNumShards prometheus.Gauge |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewQueueManager builds a new QueueManager.
|
|
|
|
|
@ -293,10 +323,16 @@ func (t *QueueManager) Start() { |
|
|
|
|
t.succeededSamplesTotal = succeededSamplesTotal.WithLabelValues(name) |
|
|
|
|
t.retriedSamplesTotal = retriedSamplesTotal.WithLabelValues(name) |
|
|
|
|
t.shardCapacity = shardCapacity.WithLabelValues(name) |
|
|
|
|
t.maxNumShards = maxNumShards.WithLabelValues(name) |
|
|
|
|
t.minNumShards = minNumShards.WithLabelValues(name) |
|
|
|
|
t.desiredNumShards = desiredNumShards.WithLabelValues(name) |
|
|
|
|
|
|
|
|
|
// Initialise some metrics.
|
|
|
|
|
t.shardCapacity.Set(float64(t.cfg.Capacity)) |
|
|
|
|
t.pendingSamplesMetric.Set(0) |
|
|
|
|
t.maxNumShards.Set(float64(t.cfg.MaxShards)) |
|
|
|
|
t.minNumShards.Set(float64(t.cfg.MinShards)) |
|
|
|
|
t.desiredNumShards.Set(float64(t.cfg.MinShards)) |
|
|
|
|
|
|
|
|
|
t.shards.start(t.numShards) |
|
|
|
|
t.watcher.Start() |
|
|
|
|
@ -336,6 +372,9 @@ func (t *QueueManager) Stop() { |
|
|
|
|
succeededSamplesTotal.DeleteLabelValues(name) |
|
|
|
|
retriedSamplesTotal.DeleteLabelValues(name) |
|
|
|
|
shardCapacity.DeleteLabelValues(name) |
|
|
|
|
maxNumShards.DeleteLabelValues(name) |
|
|
|
|
minNumShards.DeleteLabelValues(name) |
|
|
|
|
desiredNumShards.DeleteLabelValues(name) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
|
|
|
|
@ -483,6 +522,7 @@ func (t *QueueManager) calculateDesiredShards() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
numShards := int(math.Ceil(desiredShards)) |
|
|
|
|
t.desiredNumShards.Set(float64(numShards)) |
|
|
|
|
if numShards > t.cfg.MaxShards { |
|
|
|
|
numShards = t.cfg.MaxShards |
|
|
|
|
} else if numShards < t.cfg.MinShards { |
|
|
|
|
|