@ -42,7 +42,6 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/schema"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
@ -133,7 +132,7 @@ func TestBasicContentNegotiation(t *testing.T) {
} {
t . Run ( tc . name , func ( t * testing . T ) {
dir := t . TempDir ( )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil , false )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil )
defer s . Close ( )
var (
@ -242,7 +241,7 @@ func TestSampleDelivery(t *testing.T) {
} {
t . Run ( fmt . Sprintf ( "%s-%s" , tc . protoMsg , tc . name ) , func ( t * testing . T ) {
dir := t . TempDir ( )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil , false )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil )
defer s . Close ( )
var (
@ -323,7 +322,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager ( t testing . TB , cfg config . QueueConfig , mcfg config . MetadataConfig , deadline time . Duration , c WriteClient , protoMsg config . RemoteWriteProtoMsg ) * QueueManager {
dir := t . TempDir ( )
metrics := newQueueManagerMetrics ( nil , "" , "" )
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , deadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , false , protoMsg )
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , deadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , protoMsg )
return m
}
@ -364,7 +363,7 @@ func TestMetadataDelivery(t *testing.T) {
func TestWALMetadataDelivery ( t * testing . T ) {
dir := t . TempDir ( )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil , false )
s := NewStorage ( nil , nil , nil , dir , defaultFlushDeadline , nil )
defer s . Close ( )
cfg := config . DefaultQueueConfig
@ -783,7 +782,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
m := NewQueueManager ( metrics , nil , nil , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , client , 0 , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , false , config . RemoteWriteProtoMsgV1 )
m := NewQueueManager ( metrics , nil , nil , nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , client , 0 , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , config . RemoteWriteProtoMsgV1 )
m . StoreSeries ( fakeSeries , 0 )
// Attempt to samples while the manager is running. We immediately stop the
@ -1460,7 +1459,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config . DefaultQueueConfig
mcfg := config . DefaultMetadataConfig
metrics := newQueueManagerMetrics ( nil , "" , "" )
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , defaultFlushDeadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , false , config . RemoteWriteProtoMsgV1 )
m := NewQueueManager ( metrics , nil , nil , nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , cfg , mcfg , labels . EmptyLabels ( ) , nil , c , defaultFlushDeadline , newPool ( ) , newHighestTimestampMetric ( ) , nil , false , false , config . RemoteWriteProtoMsgV1 )
m . externalLabels = tc . externalLabels
m . relabelConfigs = tc . relabelConfigs
@ -1939,7 +1938,7 @@ func BenchmarkBuildV2WriteRequest(b *testing.B) {
totalSize := 0
for i := 0 ; i < b . N ; i ++ {
populateV2TimeSeries ( & symbolTable , batch , seriesBuff , true , true , false )
populateV2TimeSeries ( & symbolTable , batch , seriesBuff , true , true )
req , _ , _ , err := buildV2WriteRequest ( noopLogger , seriesBuff , symbolTable . Symbols ( ) , & pBuf , nil , cEnc , "snappy" )
if err != nil {
b . Fatal ( err )
@ -2345,7 +2344,8 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) {
}
nSamples , nExemplars , nHistograms , nMetadata , nUnexpected := populateV2TimeSeries (
& symbolTable , batch , pendingData , false , false , false )
& symbolTable , batch , pendingData , false , false ,
)
require . Equal ( t , 2 , nSamples , "Should count 2 samples" )
require . Equal ( t , 0 , nExemplars , "Should count 0 exemplars" )
@ -2353,131 +2353,3 @@ func TestPopulateV2TimeSeries_UnexpectedMetadata(t *testing.T) {
require . Equal ( t , 0 , nMetadata , "Should count 0 processed metadata" )
require . Equal ( t , 2 , nUnexpected , "Should count 2 unexpected metadata" )
}
func TestPopulateV2TimeSeries_TypeAndUnitLabels ( t * testing . T ) {
symbolTable := writev2 . NewSymbolTable ( )
testCases := [ ] struct {
name string
typeLabel string
unitLabel string
expectedType writev2 . Metadata_MetricType
description string
} {
{
name : "counter_with_unit" ,
typeLabel : "counter" ,
unitLabel : "operations" ,
expectedType : writev2 . Metadata_METRIC_TYPE_COUNTER ,
description : "Counter metric with operations unit" ,
} ,
{
name : "gauge_with_bytes" ,
typeLabel : "gauge" ,
unitLabel : "bytes" ,
expectedType : writev2 . Metadata_METRIC_TYPE_GAUGE ,
description : "Gauge metric with bytes unit" ,
} ,
{
name : "histogram_with_seconds" ,
typeLabel : "histogram" ,
unitLabel : "seconds" ,
expectedType : writev2 . Metadata_METRIC_TYPE_HISTOGRAM ,
description : "Histogram metric with seconds unit" ,
} ,
{
name : "summary_with_ratio" ,
typeLabel : "summary" ,
unitLabel : "ratio" ,
expectedType : writev2 . Metadata_METRIC_TYPE_SUMMARY ,
description : "Summary metric with ratio unit" ,
} ,
{
name : "info_no_unit" ,
typeLabel : "info" ,
unitLabel : "" ,
expectedType : writev2 . Metadata_METRIC_TYPE_INFO ,
description : "Info metric without unit" ,
} ,
{
name : "stateset_no_unit" ,
typeLabel : "stateset" ,
unitLabel : "" ,
expectedType : writev2 . Metadata_METRIC_TYPE_STATESET ,
description : "Stateset metric without unit" ,
} ,
{
name : "unknown_type" ,
typeLabel : "unknown_type" ,
unitLabel : "meters" ,
expectedType : writev2 . Metadata_METRIC_TYPE_UNSPECIFIED ,
description : "Unknown type defaults to unspecified" ,
} ,
{
name : "empty_type_with_unit" ,
typeLabel : "" ,
unitLabel : "watts" ,
expectedType : writev2 . Metadata_METRIC_TYPE_UNSPECIFIED ,
description : "Empty type with unit" ,
} ,
{
name : "type_no_unit" ,
typeLabel : "gauge" ,
unitLabel : "" ,
expectedType : writev2 . Metadata_METRIC_TYPE_GAUGE ,
description : "Type without unit" ,
} ,
{
name : "no_type_no_unit" ,
typeLabel : "" ,
unitLabel : "" ,
expectedType : writev2 . Metadata_METRIC_TYPE_UNSPECIFIED ,
description : "No type and no unit" ,
} ,
}
for _ , tc := range testCases {
t . Run ( tc . name , func ( t * testing . T ) {
batch := make ( [ ] timeSeries , 1 )
builder := labels . NewScratchBuilder ( 2 )
metadata := schema . Metadata {
Name : "test_metric_" + tc . name ,
Type : model . MetricType ( tc . typeLabel ) ,
Unit : tc . unitLabel ,
}
metadata . AddToLabels ( & builder )
batch [ 0 ] = timeSeries {
seriesLabels : builder . Labels ( ) ,
value : 123.45 ,
timestamp : time . Now ( ) . UnixMilli ( ) ,
sType : tSample ,
}
pendingData := make ( [ ] writev2 . TimeSeries , 1 )
symbolTable . Reset ( )
nSamples , nExemplars , nHistograms , _ , _ := populateV2TimeSeries (
& symbolTable ,
batch ,
pendingData ,
false , // sendExemplars
false , // sendNativeHistograms
true , // enableTypeAndUnitLabels
)
require . Equal ( t , 1 , nSamples , "Should have 1 sample" )
require . Equal ( t , 0 , nExemplars , "Should have 0 exemplars" )
require . Equal ( t , 0 , nHistograms , "Should have 0 histograms" )
require . Equal ( t , tc . expectedType , pendingData [ 0 ] . Metadata . Type ,
"Type should match expected for %s" , tc . description )
unitRef := pendingData [ 0 ] . Metadata . UnitRef
symbols := symbolTable . Symbols ( )
require . Equal ( t , tc . unitLabel , symbols [ unitRef ] , "Unit should match" )
} )
}
}