@ -38,6 +38,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/config"
@ -468,10 +470,8 @@ func TestPopulateLabels(t *testing.T) {
func loadConfiguration ( t testing . TB , c string ) * config . Config {
t . Helper ( )
cfg := & config . Config { }
err := yaml . UnmarshalStrict ( [ ] byte ( c ) , cfg )
require . NoError ( t , err , "Unable to load YAML config." )
cfg , err := config . Load ( c , promslog . NewNopLogger ( ) )
require . NoError ( t , err )
return cfg
}
@ -724,33 +724,6 @@ scrape_configs:
require . ElementsMatch ( t , [ ] string { "job1" , "job3" } , scrapeManager . ScrapePools ( ) )
}
func setupScrapeManager ( t * testing . T , honorTimestamps , enableCTZeroIngestion bool ) ( * collectResultAppender , * Manager ) {
app := & collectResultAppender { }
scrapeManager , err := NewManager (
& Options {
EnableCreatedTimestampZeroIngestion : enableCTZeroIngestion ,
skipOffsetting : true ,
} ,
promslog . New ( & promslog . Config { } ) ,
nil ,
& collectResultAppendable { app } ,
prometheus . NewRegistry ( ) ,
)
require . NoError ( t , err )
require . NoError ( t , scrapeManager . ApplyConfig ( & config . Config {
GlobalConfig : config . GlobalConfig {
// Disable regular scrapes.
ScrapeInterval : model . Duration ( 9999 * time . Minute ) ,
ScrapeTimeout : model . Duration ( 5 * time . Second ) ,
ScrapeProtocols : [ ] config . ScrapeProtocol { config . OpenMetricsText1_0_0 , config . PrometheusProto } ,
} ,
ScrapeConfigs : [ ] * config . ScrapeConfig { { JobName : "test" , HonorTimestamps : honorTimestamps } } ,
} ) )
return app , scrapeManager
}
func setupTestServer ( t * testing . T , typ string , toWrite [ ] byte ) * httptest . Server {
once := sync . Once { }
@ -789,6 +762,9 @@ func TestManagerCTZeroIngestion(t *testing.T) {
t . Run ( fmt . Sprintf ( "withCT=%v" , testWithCT ) , func ( t * testing . T ) {
for _ , testCTZeroIngest := range [ ] bool { false , true } {
t . Run ( fmt . Sprintf ( "ctZeroIngest=%v" , testCTZeroIngest ) , func ( t * testing . T ) {
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
sampleTs := time . Now ( )
ctTs := time . Time { }
if testWithCT {
@ -797,10 +773,45 @@ func TestManagerCTZeroIngestion(t *testing.T) {
// TODO(bwplotka): Add more types than just counter?
encoded := prepareTestEncodedCounter ( t , testFormat , expectedMetricName , expectedSampleValue , sampleTs , ctTs )
app , scrapeManager := setupScrapeManager ( t , true , testCTZeroIngest )
// Perform the test.
doOneScrape ( t , scrapeManager , app , setupTestServer ( t , config . ScrapeProtocolsHeaders [ testFormat ] , encoded ) )
app := & collectResultAppender { }
discoveryManager , scrapeManager := runManagers ( t , ctx , & Options {
EnableCreatedTimestampZeroIngestion : testCTZeroIngest ,
skipOffsetting : true ,
} , & collectResultAppendable { app } )
defer scrapeManager . Stop ( )
server := setupTestServer ( t , config . ScrapeProtocolsHeaders [ testFormat ] , encoded )
serverURL , err := url . Parse ( server . URL )
require . NoError ( t , err )
testConfig := fmt . Sprintf ( `
global :
# Disable regular scrapes .
scrape_interval : 9999 m
scrape_timeout : 5 s
scrape_configs :
- job_name : test
honor_timestamps : true
static_configs :
- targets : [ ' % s ' ]
` , serverURL . Host )
applyConfig ( t , testConfig , scrapeManager , discoveryManager )
// Wait for one scrape.
ctx , cancel = context . WithTimeout ( ctx , 1 * time . Minute )
defer cancel ( )
require . NoError ( t , runutil . Retry ( 100 * time . Millisecond , ctx . Done ( ) , func ( ) error {
app . mtx . Lock ( )
defer app . mtx . Unlock ( )
// Check if scrape happened and grab the relevant samples.
if len ( app . resultFloats ) > 0 {
return nil
}
return errors . New ( "expected some float samples, got none" )
} ) , "after 1 minute" )
// Verify results.
// Verify what we got vs expectations around CT injection.
@ -871,39 +882,6 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName
}
}
func doOneScrape ( t * testing . T , manager * Manager , appender * collectResultAppender , server * httptest . Server ) {
t . Helper ( )
serverURL , err := url . Parse ( server . URL )
require . NoError ( t , err )
// Add fake target directly into tsets + reload
manager . updateTsets ( map [ string ] [ ] * targetgroup . Group {
"test" : { {
Targets : [ ] model . LabelSet { {
model . SchemeLabel : model . LabelValue ( serverURL . Scheme ) ,
model . AddressLabel : model . LabelValue ( serverURL . Host ) ,
} } ,
} } ,
} )
manager . reload ( )
// Wait for one scrape.
ctx , cancel := context . WithTimeout ( context . Background ( ) , 1 * time . Minute )
defer cancel ( )
require . NoError ( t , runutil . Retry ( 100 * time . Millisecond , ctx . Done ( ) , func ( ) error {
appender . mtx . Lock ( )
defer appender . mtx . Unlock ( )
// Check if scrape happened and grab the relevant samples.
if len ( appender . resultFloats ) > 0 {
return nil
}
return errors . New ( "expected some float samples, got none" )
} ) , "after 1 minute" )
manager . Stop ( )
}
func findSamplesForMetric ( floats [ ] floatSample , metricName string ) ( ret [ ] floatSample ) {
for _ , f := range floats {
if f . metric . Get ( model . MetricNameLabel ) == metricName {
@ -978,37 +956,22 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
} ,
} {
t . Run ( tc . name , func ( t * testing . T ) {
app := & collectResultAppender { }
scrapeManager , err := NewManager (
& Options {
EnableCreatedTimestampZeroIngestion : tc . enableCTZeroIngestion ,
EnableNativeHistogramsIngestion : true ,
skipOffsetting : true ,
} ,
promslog . New ( & promslog . Config { } ) ,
nil ,
& collectResultAppendable { app } ,
prometheus . NewRegistry ( ) ,
)
require . NoError ( t , err )
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
require . NoError ( t , scrapeManager . ApplyConfig ( & config . Config {
GlobalConfig : config . GlobalConfig {
// Disable regular scrapes.
ScrapeInterval : model . Duration ( 9999 * time . Minute ) ,
ScrapeTimeout : model . Duration ( 5 * time . Second ) ,
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols : [ ] config . ScrapeProtocol { config . PrometheusProto } ,
} ,
ScrapeConfigs : [ ] * config . ScrapeConfig { { JobName : "test" } } ,
} ) )
app := & collectResultAppender { }
discoveryManager , scrapeManager := runManagers ( t , ctx , & Options {
EnableCreatedTimestampZeroIngestion : tc . enableCTZeroIngestion ,
EnableNativeHistogramsIngestion : true ,
skipOffsetting : true ,
} , & collectResultAppendable { app } )
defer scrapeManager . Stop ( )
once := sync . Once { }
// Start fake HTTP target to that allow one scrape only.
server := httptest . NewServer (
http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
fail := true // TODO(bwplotka): Kill or use?
fail := true
once . Do ( func ( ) {
fail = false
w . Header ( ) . Set ( "Content-Type" , ` application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited ` )
@ -1031,22 +994,23 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
serverURL , err := url . Parse ( server . URL )
require . NoError ( t , err )
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager . updateTsets ( map [ string ] [ ] * targetgroup . Group {
"test" : { {
Targets : [ ] model . LabelSet { {
model . SchemeLabel : model . LabelValue ( serverURL . Scheme ) ,
model . AddressLabel : model . LabelValue ( serverURL . Host ) ,
} } ,
} } ,
} )
scrapeManager . reload ( )
testConfig := fmt . Sprintf ( `
global :
# Disable regular scrapes .
scrape_interval : 9999 m
scrape_timeout : 5 s
scrape_configs :
- job_name : test
static_configs :
- targets : [ ' % s ' ]
` , serverURL . Host )
applyConfig ( t , testConfig , scrapeManager , discoveryManager )
var got [ ] histogramSample
// Wait for one scrape.
ctx , cancel : = context . WithTimeout ( con te xt . Background ( ) , 1 * time . Minute )
ctx , cancel = context . WithTimeout ( ctx , 1 * time . Minute )
defer cancel ( )
require . NoError ( t , runutil . Retry ( 100 * time . Millisecond , ctx . Done ( ) , func ( ) error {
app . mtx . Lock ( )
@ -1064,7 +1028,6 @@ func TestManagerCTZeroIngestionHistogram(t *testing.T) {
}
return errors . New ( "expected some histogram samples, got none" )
} ) , "after 1 minute" )
scrapeManager . Stop ( )
// Check for zero samples, assuming we only injected always one histogram sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
@ -1118,9 +1081,17 @@ func applyConfig(
require . NoError ( t , discoveryManager . ApplyConfig ( c ) )
}
func runManagers ( t * testing . T , ctx context . Context ) ( * discovery . Manager , * Manager ) {
func runManagers ( t * testing . T , ctx context . Context , opts * Options , app storage . Appendable ) ( * discovery . Manager , * Manager ) {
t . Helper ( )
if opts == nil {
opts = & Options { }
}
opts . DiscoveryReloadInterval = model . Duration ( 100 * time . Millisecond )
if app == nil {
app = nopAppendable { }
}
reg := prometheus . NewRegistry ( )
sdMetrics , err := discovery . RegisterSDMetrics ( reg , discovery . NewRefreshMetrics ( reg ) )
require . NoError ( t , err )
@ -1132,10 +1103,10 @@ func runManagers(t *testing.T, ctx context.Context) (*discovery.Manager, *Manage
discovery . Updatert ( 100 * time . Millisecond ) ,
)
scrapeManager , err := NewManager (
& Opti ons { DiscoveryReloadIn terval : model . Duration ( 100 * time . Millisecond ) } ,
op ts ,
nil ,
nil ,
nopAppend able { } ,
app ,
prometheus . NewRegistry ( ) ,
)
require . NoError ( t , err )
@ -1213,7 +1184,7 @@ scrape_configs:
- files : [ ' % s ' ]
`
discoveryManager , scrapeManager := runManagers ( t , ctx )
discoveryManager , scrapeManager := runManagers ( t , ctx , nil , nil )
defer scrapeManager . Stop ( )
applyConfig (
@ -1312,7 +1283,7 @@ scrape_configs:
file_sd_configs :
- files : [ ' % s ' , ' % s ' ]
`
discoveryManager , scrapeManager := runManagers ( t , ctx )
discoveryManager , scrapeManager := runManagers ( t , ctx , nil , nil )
defer scrapeManager . Stop ( )
applyConfig (
@ -1372,7 +1343,7 @@ scrape_configs:
file_sd_configs :
- files : [ ' % s ' ]
`
discoveryManager , scrapeManager := runManagers ( t , ctx )
discoveryManager , scrapeManager := runManagers ( t , ctx , nil , nil )
defer scrapeManager . Stop ( )
applyConfig (
@ -1439,7 +1410,7 @@ scrape_configs:
- targets : [ ' % s ' ]
`
discoveryManager , scrapeManager := runManagers ( t , ctx )
discoveryManager , scrapeManager := runManagers ( t , ctx , nil , nil )
defer scrapeManager . Stop ( )
// Apply the initial config with an existing file