@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
dto "github.com/prometheus/prometheus/prompb/io/prometheus/client"
"github.com/prometheus/prometheus/schema"
"github.com/prometheus/prometheus/util/convertnhcb"
)
// floatFormatBufPool is exclusively used in formatOpenMetricsFloat.
@ -78,20 +79,31 @@ type ProtobufParser struct {
// Whether to also parse a classic histogram that is also present as a
// native histogram.
parseClassicHistograms bool
parseClassicHistograms bool
// Whether to add type and unit labels.
enableTypeAndUnitLabels bool
// Whether to convert classic histograms to native histograms with custom buckets.
convertClassicHistogramsToNHCB bool
// Reusable classic to NHCB converter.
tmpNHCB convertnhcb . TempHistogram
// We need to preload NHCB since we cannot do error handling in Histogram().
nhcbH * histogram . Histogram
nhcbFH * histogram . FloatHistogram
}
// NewProtobufParser returns a parser for the payload in the byte slice.
func NewProtobufParser ( b [ ] byte , parseClassicHistograms , enableTypeAndUnitLabels bool , st * labels . SymbolTable ) Parser {
func NewProtobufParser ( b [ ] byte , parseClassicHistograms , convertClassicHistogramsToNHCB , enableTypeAndUnitLabels bool , st * labels . SymbolTable ) Parser {
return & ProtobufParser {
dec : dto . NewMetricStreamingDecoder ( b ) ,
entryBytes : & bytes . Buffer { } ,
builder : labels . NewScratchBuilderWithSymbolTable ( st , 16 ) , // TODO(bwplotka): Try base builder.
state : EntryInvalid ,
parseClassicHistograms : parseClassicHistograms ,
enableTypeAndUnitLabels : enableTypeAndUnitLabels ,
state : EntryInvalid ,
parseClassicHistograms : parseClassicHistograms ,
enableTypeAndUnitLabels : enableTypeAndUnitLabels ,
convertClassicHistogramsToNHCB : convertClassicHistogramsToNHCB ,
tmpNHCB : convertnhcb . NewTempHistogram ( ) ,
}
}
@ -182,6 +194,15 @@ func (p *ProtobufParser) Histogram() ([]byte, *int64, *histogram.Histogram, *his
h = p . dec . GetHistogram ( )
)
if ! isNativeHistogram ( h ) {
// This only happens if we have a classic histogram and
// we converted it to NHCB already in Next.
if * ts != 0 {
return p . entryBytes . Bytes ( ) , ts , p . nhcbH , p . nhcbFH
}
return p . entryBytes . Bytes ( ) , nil , p . nhcbH , p . nhcbFH
}
if p . parseClassicHistograms && len ( h . GetBucket ( ) ) > 0 {
p . redoClassic = true
}
@ -406,6 +427,8 @@ func (p *ProtobufParser) CreatedTimestamp() int64 {
// read.
func ( p * ProtobufParser ) Next ( ) ( Entry , error ) {
p . exemplarReturned = false
p . nhcbH = nil
p . nhcbFH = nil
switch p . state {
// Invalid state occurs on:
// * First Next() call.
@ -468,8 +491,12 @@ func (p *ProtobufParser) Next() (Entry, error) {
p . state = EntryType
case EntryType :
t := p . dec . GetType ( )
if ( t == dto . MetricType_HISTOGRAM || t == dto . MetricType_GAUGE_HISTOGRAM ) &&
isNativeHistogram ( p . dec . GetHistogram ( ) ) {
if t == dto . MetricType_HISTOGRAM || t == dto . MetricType_GAUGE_HISTOGRAM {
if ! isNativeHistogram ( p . dec . GetHistogram ( ) ) {
p . state = EntrySeries
p . fieldPos = - 3 // We have not returned anything, let p.Next() increment it to -2.
return p . Next ( )
}
p . state = EntryHistogram
} else {
p . state = EntrySeries
@ -480,14 +507,18 @@ func (p *ProtobufParser) Next() (Entry, error) {
case EntrySeries :
// Potentially a second series in the metric family.
t := p . dec . GetType ( )
decodeNext := true
if t == dto . MetricType_SUMMARY ||
t == dto . MetricType_HISTOGRAM ||
t == dto . MetricType_GAUGE_HISTOGRAM {
// Non-trivial series (complex metrics, with magic suffixes).
isClassicHistogram := ( t == dto . MetricType_HISTOGRAM || t == dto . MetricType_GAUGE_HISTOGRAM ) && ! isNativeHistogram ( p . dec . GetHistogram ( ) )
skipSeries := p . convertClassicHistogramsToNHCB && isClassicHistogram && ! p . parseClassicHistograms
// Did we iterate over all the classic representations fields?
// NOTE: p.fieldsDone is updated on p.onSeriesOrHistogramUpdate.
if ! p . fieldsDone {
if ! p . fieldsDone && ! skipSeries {
// Still some fields to iterate over.
p . fieldPos ++
if err := p . onSeriesOrHistogramUpdate ( ) ; err != nil {
@ -504,25 +535,39 @@ func (p *ProtobufParser) Next() (Entry, error) {
// If this is a metric family containing native
// histograms, it means we are here thanks to redoClassic state.
// Return to native histograms for the consistent flow.
if ( t == dto . MetricType_HISTOGRAM || t == dto . MetricType_GAUGE_HISTOGRAM ) &&
isNativeHistogram ( p . dec . GetHistogram ( ) ) {
p . state = EntryHistogram
// If this is a metric family containing classic histograms,
// it means we might need to do NHCB conversion.
if t == dto . MetricType_HISTOGRAM || t == dto . MetricType_GAUGE_HISTOGRAM {
if ! isClassicHistogram {
p . state = EntryHistogram
} else if p . convertClassicHistogramsToNHCB {
// We still need to spit out the NHCB.
var err error
p . nhcbH , p . nhcbFH , err = p . convertToNHCB ( t )
if err != nil {
return EntryInvalid , err
}
p . state = EntryHistogram
// We have an NHCB to emit, no need to decode the next series.
decodeNext = false
}
}
}
// Is there another series?
if err := p . dec . NextMetric ( ) ; err != nil {
if errors . Is ( err , io . EOF ) {
p . state = EntryInvalid
return p . Next ( )
if decodeNext {
if err := p . dec . NextMetric ( ) ; err != nil {
if errors . Is ( err , io . EOF ) {
p . state = EntryInvalid
return p . Next ( )
}
return EntryInvalid , err
}
return EntryInvalid , err
}
if err := p . onSeriesOrHistogramUpdate ( ) ; err != nil {
return EntryInvalid , err
}
case EntryHistogram :
// Was Histogram() called and parseClassicHistograms is true?
if p . redoClassic {
switchToClassic := func ( ) ( Entry , error ) {
p . redoClassic = false
p . fieldPos = - 3
p . fieldsDone = false
@ -530,6 +575,11 @@ func (p *ProtobufParser) Next() (Entry, error) {
return p . Next ( ) // Switch to classic histogram.
}
// Was Histogram() called and parseClassicHistograms is true?
if p . redoClassic {
return switchToClassic ( )
}
// Is there another series?
if err := p . dec . NextMetric ( ) ; err != nil {
if errors . Is ( err , io . EOF ) {
@ -538,6 +588,14 @@ func (p *ProtobufParser) Next() (Entry, error) {
}
return EntryInvalid , err
}
// If this is a metric family does not contain native
// histograms, it means we are here thanks to NHCB conversion.
// Return to classic histograms for the consistent flow.
if ! isNativeHistogram ( p . dec . GetHistogram ( ) ) {
return switchToClassic ( )
}
if err := p . onSeriesOrHistogramUpdate ( ) ; err != nil {
return EntryInvalid , err
}
@ -690,3 +748,43 @@ func isNativeHistogram(h *dto.Histogram) bool {
h . GetZeroThreshold ( ) > 0 ||
h . GetZeroCount ( ) > 0
}
func ( p * ProtobufParser ) convertToNHCB ( t dto . MetricType ) ( * histogram . Histogram , * histogram . FloatHistogram , error ) {
h := p . dec . GetHistogram ( )
p . tmpNHCB . Reset ( )
// TODO(krajorama): convertnhcb should support setting integer mode up
// front since we know it here. That would avoid the converter having
// to guess it based on counts.
v := h . GetSampleCountFloat ( )
if v == 0 {
v = float64 ( h . GetSampleCount ( ) )
}
if err := p . tmpNHCB . SetCount ( v ) ; err != nil {
return nil , nil , err
}
if err := p . tmpNHCB . SetSum ( h . GetSampleSum ( ) ) ; err != nil {
return nil , nil , err
}
for _ , b := range h . GetBucket ( ) {
v := b . GetCumulativeCountFloat ( )
if v == 0 {
v = float64 ( b . GetCumulativeCount ( ) )
}
if err := p . tmpNHCB . SetBucketCount ( b . GetUpperBound ( ) , v ) ; err != nil {
return nil , nil , err
}
}
ch , cfh , err := p . tmpNHCB . Convert ( )
if err != nil {
return nil , nil , err
}
if t == dto . MetricType_GAUGE_HISTOGRAM {
if ch != nil {
ch . CounterResetHint = histogram . GaugeType
} else {
cfh . CounterResetHint = histogram . GaugeType
}
}
return ch , cfh , nil
}