@ -968,8 +968,17 @@ remote_write:
// TestRemoteWrite_ReshardingWithoutDeadlock ensures that resharding (scaling up) doesn't block when the shards are full.
// See: https://github.com/prometheus/prometheus/issues/17384.
//
// The following shows key resharding metrics before and after the fix.
// In v3.7.0, the deadlock prevented the resharding logic from observing the true incoming data rate.
//
// | Metric | v3.7.0 | after the fix |
// |---------------------|---------------|---------------------|
// | dataInRate | 0.6 | 307.2 |
// | dataPendingRate | 0.2 | 306.8 |
// | dataPending | 0 | 1228.8 |
// | desiredShards | 0.6 | 369.2 |.
func TestRemoteWrite_ReshardingWithoutDeadlock ( t * testing . T ) {
t . Skip ( "flaky test, see https://github.com/prometheus/prometheus/issues/17489" )
t . Parallel ( )
tmpDir := t . TempDir ( )
@ -984,7 +993,8 @@ func TestRemoteWrite_ReshardingWithoutDeadlock(t *testing.T) {
config := fmt . Sprintf ( `
global :
scrape_interval : 100 ms
# Using a smaller interval may cause the scrape to time out .
scrape_interval : 1 s
scrape_configs :
- job_name : ' self '
static_configs :
@ -995,6 +1005,8 @@ remote_write:
queue_config :
# Speed up the queue being full .
capacity : 1
# Helps keep the “ time to send one sample ” low so it doesn ’ t influence the resharding logic .
max_samples_per_send : 1
` , port , server . URL )
require . NoError ( t , os . WriteFile ( configFile , [ ] byte ( config ) , 0 o777 ) )
@ -1003,36 +1015,52 @@ remote_write:
configFile ,
port ,
fmt . Sprintf ( "--storage.tsdb.path=%s" , tmpDir ) ,
"--log.level=debug" ,
)
require . NoError ( t , prom . Start ( ) )
var checkInitialDesiredShardsOnce sync . Once
require . Eventually ( t , func ( ) bool {
const desiredShardsMetric = "prometheus_remote_storage_shards_desired"
getMetrics := func ( ) ( [ ] byte , error ) {
r , err := http . Get ( fmt . Sprintf ( "http://127.0.0.1:%d/metrics" , port ) )
if err != nil {
return false
return nil , err
}
defer r . Body . Close ( )
if r . StatusCode != http . StatusOK {
return fals e
return nil , fmt . Errorf ( "un expected status code: %d" , r . StatusCode )
}
metrics , err := io . ReadAll ( r . Body )
if err != nil {
return false
return nil , err
}
return metrics , nil
}
checkInitialDesiredShardsOnce . Do ( func ( ) {
s , err := getMetricValue ( t , bytes . NewReader ( metrics ) , model . MetricTypeGauge , "prometheus_remote_storage_shards_desired" )
require . NoError ( t , err )
require . Equal ( t , 1.0 , s )
} )
// Ensure the initial desired shards is 1.
require . Eventually ( t , func ( ) bool {
metrics , err := getMetrics ( )
if err != nil {
return false
}
initialDesiredShards , err := getMetricValue ( t , bytes . NewReader ( metrics ) , model . MetricTypeGauge , desiredShardsMetric )
if err != nil {
return false
}
return initialDesiredShards == 1.0
} , 10 * time . Second , 100 * time . Millisecond )
desiredShards , err := getMetricValue ( t , bytes . NewReader ( metrics ) , model . MetricTypeGauge , "prometheus_remote_storage_shards_desired" )
if err != nil || desiredShards <= 1 {
// Ensure scaling up is triggered after some time.
require . Eventually ( t , func ( ) bool {
metrics , err := getMetrics ( )
if err != nil {
return false
}
desiredShards , err := getMetricValue ( t , bytes . NewReader ( metrics ) , model . MetricTypeGauge , desiredShardsMetric )
if err != nil || desiredShards <= 1.0 {
return false
}
return true
// 3*shardUpdateDuration to allow for the resharding logic to run.
} , 30 * time . Second , 1 * time . Second )
} , 30 * time . Second , time . Second )
}