@ -142,9 +142,14 @@ type testLoop struct {
stopFunc func ( )
forcedErr error
forcedErrMtx sync . Mutex
runOnce bool
}
func ( l * testLoop ) run ( interval , timeout time . Duration , errc chan <- error ) {
if l . runOnce {
panic ( "loop must be started only once" )
}
l . runOnce = true
l . startFunc ( interval , timeout , errc )
}
@ -316,12 +321,16 @@ func TestScrapePoolReload(t *testing.T) {
}
func TestScrapePoolTargetLimit ( t * testing . T ) {
var wg sync . WaitGroup
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func ( opts scrapeLoopOptions ) loop {
wg . Add ( 1 )
l := & testLoop {
startFunc : func ( interval , timeout time . Duration , errc chan <- error ) { } ,
stopFunc : func ( ) { } ,
startFunc : func ( interval , timeout time . Duration , errc chan <- error ) {
wg . Done ( )
} ,
stopFunc : func ( ) { } ,
}
return l
}
@ -361,6 +370,13 @@ func TestScrapePoolTargetLimit(t *testing.T) {
sp . Sync ( tgs [ : n ] )
}
validateIsRunning := func ( ) {
wg . Wait ( )
for _ , l := range sp . loops {
testutil . Assert ( t , l . ( * testLoop ) . runOnce , "loop should be running" )
}
}
validateErrorMessage := func ( shouldErr bool ) {
for _ , l := range sp . loops {
lerr := l . ( * testLoop ) . getForcedError ( )
@ -375,29 +391,54 @@ func TestScrapePoolTargetLimit(t *testing.T) {
reloadWithLimit ( 0 )
loadTargets ( 50 )
validateIsRunning ( )
// Simulate an initial config with a limit.
sp . config . TargetLimit = 30
limit = 30
loadTargets ( 50 )
validateIsRunning ( )
validateErrorMessage ( true )
reloadWithLimit ( 50 )
validateIsRunning ( )
validateErrorMessage ( false )
reloadWithLimit ( 40 )
validateIsRunning ( )
validateErrorMessage ( true )
loadTargets ( 30 )
validateIsRunning ( )
validateErrorMessage ( false )
loadTargets ( 40 )
validateIsRunning ( )
validateErrorMessage ( false )
loadTargets ( 41 )
validateIsRunning ( )
validateErrorMessage ( true )
reloadWithLimit ( 51 )
validateIsRunning ( )
validateErrorMessage ( false )
tgs = append ( tgs ,
& targetgroup . Group {
Targets : [ ] model . LabelSet {
{ model . AddressLabel : model . LabelValue ( "127.0.0.1:1090" ) } ,
} ,
} ,
& targetgroup . Group {
Targets : [ ] model . LabelSet {
{ model . AddressLabel : model . LabelValue ( "127.0.0.1:1090" ) } ,
} ,
} ,
)
sp . Sync ( tgs )
validateIsRunning ( )
validateErrorMessage ( false )
}
@ -476,6 +517,54 @@ func TestScrapePoolRaces(t *testing.T) {
sp . stop ( )
}
func TestScrapePoolScrapeLoopsStarted ( t * testing . T ) {
var wg sync . WaitGroup
newLoop := func ( opts scrapeLoopOptions ) loop {
wg . Add ( 1 )
l := & testLoop {
startFunc : func ( interval , timeout time . Duration , errc chan <- error ) {
wg . Done ( )
} ,
stopFunc : func ( ) { } ,
}
return l
}
sp := & scrapePool {
appendable : & nopAppendable { } ,
activeTargets : map [ uint64 ] * Target { } ,
loops : map [ uint64 ] loop { } ,
newLoop : newLoop ,
logger : nil ,
client : http . DefaultClient ,
}
tgs := [ ] * targetgroup . Group {
{
Targets : [ ] model . LabelSet {
{ model . AddressLabel : model . LabelValue ( "127.0.0.1:9090" ) } ,
} ,
} ,
{
Targets : [ ] model . LabelSet {
{ model . AddressLabel : model . LabelValue ( "127.0.0.1:9090" ) } ,
} ,
} ,
}
testutil . Ok ( t , sp . reload ( & config . ScrapeConfig {
ScrapeInterval : model . Duration ( 3 * time . Second ) ,
ScrapeTimeout : model . Duration ( 2 * time . Second ) ,
} ) )
sp . Sync ( tgs )
testutil . Equals ( t , 1 , len ( sp . loops ) )
wg . Wait ( )
for _ , l := range sp . loops {
testutil . Assert ( t , l . ( * testLoop ) . runOnce , "loop should be running" )
}
}
func TestScrapeLoopStopBeforeRun ( t * testing . T ) {
scraper := & testScraper { }