@ -178,6 +178,7 @@ type QueueManager struct {
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
integralAccumulator float64
startedAt time . Time
highestSentTimestampMetric * maxGauge
pendingSamplesMetric prometheus . Gauge
@ -277,6 +278,8 @@ outer:
// Start the queue manager sending samples to the remote storage.
// Does not block.
func ( t * QueueManager ) Start ( ) {
t . startedAt = time . Now ( )
// Setup the QueueManagers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
@ -440,36 +443,50 @@ func (t *QueueManager) calculateDesiredShards() {
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
samplesIn = t . samplesIn . rate ( )
samplesOut = t . samplesOut . rate ( )
samplesKeptRatio = samplesOut / ( t . samplesDropped . rate ( ) + samplesOut )
samplesOutDuration = t . samplesOutDuration . rate ( )
samplesInRate = t . samplesIn . rate ( )
samplesOutRate = t . samplesOut . rate ( )
samplesKeptRatio = samplesOutRate / ( t . samplesDropped . rate ( ) + samplesOutRate )
samplesOutDuration = t . samplesOutDuration . rate ( ) / float64 ( time . Second )
samplesPendingRate = samplesInRate * samplesKeptRatio - samplesOutRate
highestSent = t . highestSentTimestampMetric . Get ( )
highestRecv = highestTimestamp . Get ( )
samplesPending = ( highestRecv - highestSent ) * samplesIn * samplesKeptRatio
samplesPending = ( highestRecv - highestSent ) * samplesInRate * samplesKeptRatio
)
// We use an integral accumulator, like in a PID, to help dampen oscillation.
t . integralAccumulator = t . integralAccumulator + ( samplesPending * 0.1 )
if samplesOut <= 0 {
if samplesOutRate <= 0 {
return
}
// We use an integral accumulator, like in a PID, to help dampen
// oscillation. The accumulator will correct for any errors not accounted
// for in the desired shard calculation by adjusting for pending samples.
const integralGain = 0.2
// Initialise the integral accumulator as the average rate of samples
// pending. This accounts for pending samples that were created while the
// WALWatcher starts up.
if t . integralAccumulator == 0 {
elapsed := time . Since ( t . startedAt ) / time . Second
t . integralAccumulator = integralGain * samplesPending / float64 ( elapsed )
}
t . integralAccumulator += samplesPendingRate * integralGain
var (
timePerSample = samplesOutDuration / samplesOut
desiredShards = ( timePerSample * samplesPending ) / float64 ( time . Second )
timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * ( samplesInRate + t . integralAccumulator )
)
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.calculateDesiredShards" ,
"samplesIn" , samplesIn ,
"samplesOut" , samplesOut ,
"samplesInRate " , samplesInRate ,
"samplesOutRate " , samplesOutRate ,
"samplesKeptRatio" , samplesKeptRatio ,
"samplesPendingRate" , samplesPendingRate ,
"samplesPending" , samplesPending ,
"samplesOutDuration" , samplesOutDuration ,
"timePerSample" , timePerSample ,
"desiredShards" , desiredShards ,
"highestSent" , highestSent ,
"highestRecv" , highestRecv )
"highestRecv" , highestRecv ,
"integralAccumulator" , t . integralAccumulator ,
)
// Changes in the number of shards must be greater than shardToleranceFraction.
var (