Proposed changes to Append CT PR.

Changes:

* Changed textparse Parser interface for consistency and robustness.
* Changed CT interface to be more explicit and handle validation.
* Simplified test, change scrapeManager to allow testability.
* Added TODOs.


Signed-off-by: bwplotka <bwplotka@gmail.com>
arct2
bwplotka 2 years ago
parent 5cefd915ad
commit 7bfed4924e
  1. 6
      cmd/prometheus/main.go
  2. 9
      config/config.go
  3. 10
      model/textparse/interface.go
  4. 9
      model/textparse/openmetricsparse.go
  5. 9
      model/textparse/promparse.go
  6. 22
      model/textparse/protobufparse.go
  7. 34
      model/textparse/protobufparse_test.go
  8. 29
      scrape/helpers_test.go
  9. 8
      scrape/manager.go
  10. 128
      scrape/manager_test.go
  11. 55
      scrape/scrape.go
  12. 3
      scrape/scrape_test.go
  13. 6
      storage/fanout.go
  14. 29
      storage/interface.go
  15. 5
      tsdb/agent/db.go
  16. 4
      tsdb/agent/db_test.go
  17. 68
      tsdb/db_test.go
  18. 4
      tsdb/head.go
  19. 37
      tsdb/head_append.go

@ -209,12 +209,12 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "created-timestamp-ingestion":
c.scrape.EnableCreatedTimestampIngestion = true
case "created-timestamp-zero-ingestion":
c.scrape.EnableCreatedTimestampZeroIngestion = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
level.Info(logger).Log("msg", "Experimental created timestamp ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols))
case "":
continue
case "promql-at-modifier", "promql-negative-offset":

@ -454,15 +454,18 @@ var (
OpenMetricsText1_0_0: "application/openmetrics-text;version=1.0.0",
}
// DefaultScrapeProtocols is the set of scrape protocols that will be proposed
// to scrape target, ordered by priority.
DefaultScrapeProtocols = []ScrapeProtocol{
OpenMetricsText1_0_0,
OpenMetricsText0_0_1,
PrometheusText0_0_4,
}
// DefaultProtoFirstScrapeProtocols is the set of scrape protocols that favors protobuf
// Prometheus exposition format. Used by default for certain feature-flags like
// "native-histograms" and "created-timestamp-ingestion".
// DefaultProtoFirstScrapeProtocols is like DefaultScrapeProtocols, but it
// favors protobuf Prometheus exposition format.
// Used by default for certain feature-flags like
// "native-histograms" and "created-timestamp-zero-ingestion".
DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{
PrometheusProto,
OpenMetricsText1_0_0,

@ -16,8 +16,6 @@ package textparse
import (
"mime"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -66,10 +64,10 @@ type Parser interface {
// retrieved (including the case where no exemplars exist at all).
Exemplar(l *exemplar.Exemplar) bool
// CreatedTimestamp writes the created timestamp of the current sample
// into the passed timestamp. It returns false if no created timestamp
// exists or if the metric type does not support created timestamps.
CreatedTimestamp(ct *types.Timestamp) bool
// CreatedTimestamp returns the created timestamp (in milliseconds) for the
// current sample. It returns nil if it is unknown e.g. if it wasn't set,
// if the scrape protocol or metric type does not support created timestamps.
CreatedTimestamp() *int64
// Next advances the parser to the next sample. It returns false if no
// more samples were read or an error occurred.

@ -24,8 +24,6 @@ import (
"strings"
"unicode/utf8"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -213,9 +211,10 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
return true
}
// CreatedTimestamp returns false because OpenMetricsParser does not support created timestamps (yet).
func (p *OpenMetricsParser) CreatedTimestamp(_ *types.Timestamp) bool {
return false
// CreatedTimestamp returns nil as it's not implemented yet.
// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
return nil
}
// nextToken returns the next token from the openMetricsLexer.

@ -26,8 +26,6 @@ import (
"unicode/utf8"
"unsafe"
"github.com/gogo/protobuf/types"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -247,9 +245,10 @@ func (p *PromParser) Exemplar(*exemplar.Exemplar) bool {
return false
}
// CreatedTimestamp returns false because PromParser does not support created timestamps.
func (p *PromParser) CreatedTimestamp(_ *types.Timestamp) bool {
return false
// CreatedTimestamp returns nil as it's not implemented yet.
// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
func (p *PromParser) CreatedTimestamp() *int64 {
return nil
}
// nextToken returns the next token from the promlexer. It skips over tabs

@ -360,22 +360,26 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
return true
}
func (p *ProtobufParser) CreatedTimestamp(ct *types.Timestamp) bool {
var foundCT *types.Timestamp
// CreatedTimestamp returns CT or nil if CT is not present or
// invalid (as timestamp e.g. negative value) on counters, summaries or histograms.
func (p *ProtobufParser) CreatedTimestamp() *int64 {
var ct *types.Timestamp
switch p.mf.GetType() {
case dto.MetricType_COUNTER:
foundCT = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
ct = p.mf.GetMetric()[p.metricPos].GetCounter().GetCreatedTimestamp()
case dto.MetricType_SUMMARY:
foundCT = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
ct = p.mf.GetMetric()[p.metricPos].GetSummary().GetCreatedTimestamp()
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
foundCT = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
ct = p.mf.GetMetric()[p.metricPos].GetHistogram().GetCreatedTimestamp()
default:
}
if foundCT == nil {
return false
ctAsTime, err := types.TimestampFromProto(ct)
if err != nil {
// Errors means ct == nil or invalid timestamp, which we silently ignore.
return nil
}
*ct = *foundCT
return true
ctMilis := ctAsTime.UnixMilli()
return &ctMilis
}
// Next advances the parser to the next "sample" (emulating the behavior of a

@ -21,7 +21,6 @@ import (
"testing"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
@ -630,7 +629,7 @@ func TestProtobufParse(t *testing.T) {
shs *histogram.Histogram
fhs *histogram.FloatHistogram
e []exemplar.Exemplar
ct *types.Timestamp
ct int64
}
inputBuf := createTestProtoBuf(t)
@ -1069,7 +1068,7 @@ func TestProtobufParse(t *testing.T) {
{
m: "test_counter_with_createdtimestamp",
v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_counter_with_createdtimestamp",
),
@ -1085,7 +1084,7 @@ func TestProtobufParse(t *testing.T) {
{
m: "test_summary_with_createdtimestamp_count",
v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_count",
),
@ -1093,7 +1092,7 @@ func TestProtobufParse(t *testing.T) {
{
m: "test_summary_with_createdtimestamp_sum",
v: 1.234,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_sum",
),
@ -1108,7 +1107,7 @@ func TestProtobufParse(t *testing.T) {
},
{
m: "test_histogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
shs: &histogram.Histogram{
CounterResetHint: histogram.UnknownCounterReset,
PositiveSpans: []histogram.Span{},
@ -1128,7 +1127,7 @@ func TestProtobufParse(t *testing.T) {
},
{
m: "test_gaugehistogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
shs: &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
PositiveSpans: []histogram.Span{},
@ -1887,7 +1886,7 @@ func TestProtobufParse(t *testing.T) {
{ // 83
m: "test_counter_with_createdtimestamp",
v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_counter_with_createdtimestamp",
),
@ -1903,7 +1902,7 @@ func TestProtobufParse(t *testing.T) {
{ // 86
m: "test_summary_with_createdtimestamp_count",
v: 42,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_count",
),
@ -1911,7 +1910,7 @@ func TestProtobufParse(t *testing.T) {
{ // 87
m: "test_summary_with_createdtimestamp_sum",
v: 1.234,
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
lset: labels.FromStrings(
"__name__", "test_summary_with_createdtimestamp_sum",
),
@ -1926,7 +1925,7 @@ func TestProtobufParse(t *testing.T) {
},
{ // 90
m: "test_histogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
shs: &histogram.Histogram{
CounterResetHint: histogram.UnknownCounterReset,
PositiveSpans: []histogram.Span{},
@ -1946,7 +1945,7 @@ func TestProtobufParse(t *testing.T) {
},
{ // 93
m: "test_gaugehistogram_with_createdtimestamp",
ct: &types.Timestamp{Seconds: 1, Nanos: 1},
ct: 1000,
shs: &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
PositiveSpans: []histogram.Span{},
@ -1981,10 +1980,9 @@ func TestProtobufParse(t *testing.T) {
m, ts, v := p.Series()
var e exemplar.Exemplar
var ct types.Timestamp
p.Metric(&res)
eFound := p.Exemplar(&e)
ctFound := p.CreatedTimestamp(&ct)
ct := p.CreatedTimestamp()
require.Equal(t, exp[i].m, string(m), "i: %d", i)
if ts != nil {
require.Equal(t, exp[i].t, *ts, "i: %d", i)
@ -2000,11 +1998,11 @@ func TestProtobufParse(t *testing.T) {
require.Equal(t, exp[i].e[0], e, "i: %d", i)
require.False(t, p.Exemplar(&e), "too many exemplars returned, i: %d", i)
}
if exp[i].ct != nil {
require.Equal(t, true, ctFound, "i: %d", i)
require.Equal(t, exp[i].ct.String(), ct.String(), "i: %d", i)
if exp[i].ct != 0 {
require.NotNilf(t, ct, "i: %d", i)
require.Equal(t, exp[i].ct, *ct, "i: %d", i)
} else {
require.Equal(t, false, ctFound, "i: %d", i)
require.Nilf(t, ct, "i: %d", i)
}
case EntryHistogram:

@ -58,7 +58,7 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M
return 0, nil
}
func (a nopAppender) AppendCreatedTimestamp(storage.SeriesRef, labels.Labels, int64) (storage.SeriesRef, error) {
func (a nopAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
return 0, nil
}
@ -162,19 +162,8 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L
return a.next.UpdateMetadata(ref, l, m)
}
func (a *collectResultAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: l,
t: t,
f: 0.0,
})
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
return ref, nil
func (a *collectResultAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t int64, ct int64) (storage.SeriesRef, error) {
return a.Append(ref, l, ct, 0.0)
}
func (a *collectResultAppender) Commit() error {
@ -221,18 +210,20 @@ func (a *collectResultAppender) String() string {
return sb.String()
}
// serializeMetricFamily serializes a MetricFamily into a byte slice.
// Needed because Prometheus has its own implementation of protobuf
// marshalling and unmarshalling that only supports 'encoding=delimited'.
// protoMarshalDelimited marshals a MetricFamily into a delimited
// Prometheus proto exposition format bytes (known as 'encoding=delimited`)
//
// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
func serializeMetricFamily(t *testing.T, mf *dto.MetricFamily) []byte {
func protoMarshalDelimited(t *testing.T, mf *dto.MetricFamily) []byte {
t.Helper()
buf := &bytes.Buffer{}
protoBuf, err := proto.Marshal(mf)
require.NoError(t, err)
varintBuf := make([]byte, binary.MaxVarintLen32)
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))
buf := &bytes.Buffer{}
buf.Write(varintBuf[:varintLength])
buf.Write(protoBuf)
return buf.Bytes()

@ -78,11 +78,15 @@ type Options struct {
EnableMetadataStorage bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable the ingestion of the created timestamp of a metric.
EnableCreatedTimestampIngestion bool
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
// private option for testability.
skipOffsetting bool
}
// Manager maintains a set of scrape pools and manages start/stop cycles

@ -15,12 +15,13 @@ package scrape
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"sync"
"testing"
"time"
@ -724,59 +725,53 @@ scrape_configs:
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}
func TestManagerScrapeCreatedTimestamp(t *testing.T) {
counterType := dto.MetricType_COUNTER
now := time.Now()
nowMs := now.UnixMilli()
makeMfWithCT := func(ct time.Time) *dto.MetricFamily {
return &dto.MetricFamily{
Name: proto.String("expected_counter"),
Type: &counterType,
Metric: []*dto.Metric{
{
Counter: &dto.Counter{
Value: proto.Float64(1.0),
CreatedTimestamp: timestamppb.New(ct),
},
},
},
}
}
// TestManagerCTZeroIngestion tests scrape manager for CT cases.
func TestManagerCTZeroIngestion(t *testing.T) {
const mName = "expected_counter"
for _, tc := range []struct {
name string
mf *dto.MetricFamily
ingestCT bool
expectedScrapedValues []float64
counterSample *dto.Counter
enableCTZeroIngestion bool
expectedValues []float64
}{
{
name: "valid counter/Ingestion enabled",
mf: makeMfWithCT(now),
ingestCT: true,
expectedScrapedValues: []float64{0.0, 1.0},
name: "disabled with CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
expectedValues: []float64{1.0},
},
{
name: "valid counter/Ingestion disabled",
mf: makeMfWithCT(now),
ingestCT: false,
expectedScrapedValues: []float64{1.0},
name: "enabled with CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
// Timestamp does not matter as long as it exists in this test.
CreatedTimestamp: timestamppb.Now(),
},
enableCTZeroIngestion: true,
expectedValues: []float64{0.0, 1.0},
},
{
name: "created timestamp older than sample timestamp",
mf: func() *dto.MetricFamily {
mf := makeMfWithCT(now.Add(time.Hour))
mf.Metric[0].TimestampMs = &nowMs
return mf
}(),
ingestCT: true,
expectedScrapedValues: []float64{1.0},
name: "enabled without CT on counter",
counterSample: &dto.Counter{
Value: proto.Float64(1.0),
},
enableCTZeroIngestion: true,
expectedValues: []float64{1.0},
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{EnableCreatedTimestampIngestion: tc.ingestCT},
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
&collectResultAppendable{app},
prometheus.NewRegistry(),
@ -785,18 +780,35 @@ func TestManagerScrapeCreatedTimestamp(t *testing.T) {
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
ScrapeInterval: model.Duration(5 * time.Second),
ScrapeTimeout: model.Duration(5 * time.Second),
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure proto is chosen.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
// Start fake HTTP target to scrape returning a single metric.
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
w.Write(serializeMetricFamily(t, tc.mf))
fail := true
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
var ctrType = dto.MetricType_COUNTER
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Counter: tc.counterSample}},
}))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
@ -807,27 +819,27 @@ func TestManagerScrapeCreatedTimestamp(t *testing.T) {
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
},
},
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
if countFloatSamples(app, *tc.mf.Name) < 1 {
return errors.New("expected at least one sample")
if countFloatSamples(app, mName) != len(tc.expectedValues) {
return fmt.Errorf("expected %v samples", tc.expectedValues)
}
return nil
}), "after 5 seconds")
}), "after 1 minute")
scrapeManager.Stop()
require.Equal(t, tc.expectedScrapedValues, getResultFloats(app, *tc.mf.Name))
require.Equal(t, tc.expectedValues, getResultFloats(app, mName))
})
}
}

@ -31,7 +31,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/types"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
@ -107,9 +106,10 @@ type scrapeLoopOptions struct {
interval time.Duration
timeout time.Duration
scrapeClassicHistograms bool
mrc []*relabel.Config
cache *scrapeCache
enableCompression bool
mrc []*relabel.Config
cache *scrapeCache
enableCompression bool
}
const maxAheadTime = 10 * time.Minute
@ -169,12 +169,13 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed
opts.interval,
opts.timeout,
opts.scrapeClassicHistograms,
options.EnableCreatedTimestampIngestion,
options.EnableCreatedTimestampZeroIngestion,
options.ExtraMetrics,
options.EnableMetadataStorage,
opts.target,
options.PassMetadataInContext,
metrics,
options.skipOffsetting,
)
}
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
@ -789,7 +790,7 @@ type scrapeLoop struct {
interval time.Duration
timeout time.Duration
scrapeClassicHistograms bool
scrapeCreatedTimestamps bool
enableCTZeroIngestion bool
appender func(ctx context.Context) storage.Appender
sampleMutator labelsMutator
@ -807,6 +808,8 @@ type scrapeLoop struct {
appendMetadataToWAL bool
metrics *scrapeMetrics
skipOffsetting bool // For testability.
}
// scrapeCache tracks mappings of exposed metric strings to label sets and
@ -1079,12 +1082,13 @@ func newScrapeLoop(ctx context.Context,
interval time.Duration,
timeout time.Duration,
scrapeClassicHistograms bool,
scrapeCreatedTimestamps bool,
enableCTZeroIngestion bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
passMetadataInContext bool,
metrics *scrapeMetrics,
skipOffsetting bool,
) *scrapeLoop {
if l == nil {
l = log.NewNopLogger()
@ -1128,10 +1132,11 @@ func newScrapeLoop(ctx context.Context,
interval: interval,
timeout: timeout,
scrapeClassicHistograms: scrapeClassicHistograms,
scrapeCreatedTimestamps: scrapeCreatedTimestamps,
enableCTZeroIngestion: enableCTZeroIngestion,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
skipOffsetting: skipOffsetting,
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -1139,12 +1144,14 @@ func newScrapeLoop(ctx context.Context,
}
func (sl *scrapeLoop) run(errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
if !sl.skipOffsetting {
select {
case <-time.After(sl.scraper.offset(sl.interval, sl.offsetSeed)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}
}
var last time.Time
@ -1562,20 +1569,12 @@ loop:
updateMetadata(lset, true)
}
if sl.scrapeCreatedTimestamps {
var ct types.Timestamp
if p.CreatedTimestamp(&ct) {
if ctMs := (ct.Seconds * 1000) + int64(ct.Nanos/1_000_000); ctMs < t {
ref, err = app.AppendCreatedTimestamp(ref, lset, ctMs)
if err != nil {
if errors.Is(err, storage.ErrCreatedTimestampOutOfOrder) {
level.Debug(sl.l).Log("msg", storage.ErrCreatedTimestampOutOfOrder)
} else {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
break loop
}
}
}
if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.
level.Debug(sl.l).Log("msg", "Error when updating metadata in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err)
}
}

@ -664,6 +664,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app
nil,
false,
newTestScrapeMetrics(t),
false,
)
}
@ -806,6 +807,7 @@ func TestScrapeLoopRun(t *testing.T) {
nil,
false,
scrapeMetrics,
false,
)
// The loop must terminate during the initial offset if the context
@ -951,6 +953,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
nil,
false,
scrapeMetrics,
false,
)
defer cancel()

@ -202,14 +202,14 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada
return ref, nil
}
func (f *fanoutAppender) AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) {
ref, err := f.primary.AppendCreatedTimestamp(ref, l, t)
func (f *fanoutAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error) {
ref, err := f.primary.AppendCTZeroSample(ref, l, t, ct)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendCreatedTimestamp(ref, l, t); err != nil {
if _, err := appender.AppendCTZeroSample(ref, l, t, ct); err != nil {
return 0, err
}
}

@ -43,7 +43,13 @@ var (
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled")
ErrCreatedTimestampOutOfOrder = fmt.Errorf("created timestamp out of order, ignoring")
// ErrOutOfOrderCT indicates failed append of CT to the storage
// due to CT being older the then newer sample.
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now.
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
)
// SeriesRef is a generic series reference. In prometheus it is either a
@ -296,17 +302,22 @@ type MetadataUpdater interface {
UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error)
}
// CreatedTimestampAppender provides an interface for appending created timestamps to the storage.
// CreatedTimestampAppender provides an interface for appending CT to storage.
type CreatedTimestampAppender interface {
// AppendCreatedTimestamp adds an extra sample to the given series labels.
// The value of the appended sample is always zero, while the sample's timestamp
// is the one exposed by the target as created timestamp.
// AppendCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming
// sample's t (timestamp). AppendCTZeroSample returns error if zero sample can't be
// appended, for example when ct is too old, or when it would collide with
// incoming sample (sample has priority).
//
// Appending created timestamps is optional, that is because appending sythetic zeros
// should only happen if created timestamp respects the order of the samples, i.e. is not out-of-order.
// AppendCTZeroSample has to be called before the corresponding sample Append.
// A series reference number is returned which can be used to modify the
// CT for the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AppendCTZeroSample() at any point.
//
// When AppendCreatedTimestamp decides to not append a sample, it should return an error that can be treated by the caller.
AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error)
// If the reference is 0 it must not be used for caching.
AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error)
}
// SeriesSet contains a set of series.

@ -954,8 +954,9 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met
return 0, nil
}
// AppendCreatedTimestamp wasn't implemented for agent mode, yet.
func (a *appender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) {
func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t int64, ct int64) (storage.SeriesRef, error) {
// TODO(bwplotka): Implement
panic("to implement")
return 0, nil
}

@ -878,3 +878,7 @@ func TestDBAllowOOOSamples(t *testing.T) {
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
require.NoError(t, db.Close())
}
func TestAgentAppender_AppendCTZeroSample(t *testing.T) {
t.Fatalf("TODO")
}

@ -6954,3 +6954,71 @@ Outer:
require.NoError(t, writerErr)
}
func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
t.Fatalf("TODO")
// NOTE(bwplotka): We could reuse metadata test part as copied below:
updateMetadata := func(t *testing.T, app storage.Appender, s labels.Labels, m metadata.Metadata) {
_, err := app.UpdateMetadata(0, s, m)
require.NoError(t, err)
}
db := newTestDB(t)
ctx := context.Background()
// Add some series so we can append metadata to them.
app := db.Appender(ctx)
s1 := labels.FromStrings("a", "b")
s2 := labels.FromStrings("c", "d")
s3 := labels.FromStrings("e", "f")
s4 := labels.FromStrings("g", "h")
for _, s := range []labels.Labels{s1, s2, s3, s4} {
_, err := app.Append(0, s, 0, 0)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Add a first round of metadata to the first three series.
// Re-take the Appender, as the previous Commit will have it closed.
m1 := metadata.Metadata{Type: "gauge", Unit: "unit_1", Help: "help_1"}
m2 := metadata.Metadata{Type: "gauge", Unit: "unit_2", Help: "help_2"}
m3 := metadata.Metadata{Type: "gauge", Unit: "unit_3", Help: "help_3"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s2, m2)
updateMetadata(t, app, s3, m3)
require.NoError(t, app.Commit())
// Add a replicated metadata entry to the first series,
// a completely new metadata entry for the fourth series,
// and a changed metadata entry to the second series.
m4 := metadata.Metadata{Type: "counter", Unit: "unit_4", Help: "help_4"}
m5 := metadata.Metadata{Type: "counter", Unit: "unit_5", Help: "help_5"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s4, m4)
updateMetadata(t, app, s2, m5)
require.NoError(t, app.Commit())
// Read the WAL to see if the disk storage format is correct.
recs := readTestWAL(t, path.Join(db.Dir(), "wal"))
var gotMetadataBlocks [][]record.RefMetadata
for _, rec := range recs {
if mr, ok := rec.([]record.RefMetadata); ok {
gotMetadataBlocks = append(gotMetadataBlocks, mr)
}
}
expectedMetadata := []record.RefMetadata{
{Ref: 1, Type: record.GetMetricType(m1.Type), Unit: m1.Unit, Help: m1.Help},
{Ref: 2, Type: record.GetMetricType(m2.Type), Unit: m2.Unit, Help: m2.Help},
{Ref: 3, Type: record.GetMetricType(m3.Type), Unit: m3.Unit, Help: m3.Help},
{Ref: 4, Type: record.GetMetricType(m4.Type), Unit: m4.Unit, Help: m4.Help},
{Ref: 2, Type: record.GetMetricType(m5.Type), Unit: m5.Unit, Help: m5.Help},
}
require.Len(t, gotMetadataBlocks, 2)
require.Equal(t, expectedMetadata[:3], gotMetadataBlocks[0])
require.Equal(t, expectedMetadata[3:], gotMetadataBlocks[1])
}

@ -149,6 +149,10 @@ type HeadOptions struct {
// EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms atomic.Bool
// EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableCreatedTimestampZeroIngestion bool
ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string

@ -87,15 +87,15 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m
return a.app.UpdateMetadata(ref, l, m)
}
func (a *initAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) {
func (a *initAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t int64, ct int64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendCreatedTimestamp(ref, lset, t)
return a.app.AppendCTZeroSample(ref, lset, t, ct)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendCreatedTimestamp(ref, lset, t)
return a.app.AppendCTZeroSample(ref, lset, t, ct)
}
// initTime initializes a head with the first timestamp. This only needs to be called
@ -383,10 +383,14 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
return storage.SeriesRef(s.ref), nil
}
// AppendCreatedTimestamp appends a sample with 0 as its value when it makes sense to do so.
// For instance, it's not safe or efficient to append out-of-order created
// timestamp (e.g. we don't know if we didn't append zero for this created timestamp already).
func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) {
// AppendCTZeroSample appends synthetic zero sample for ct timestamp. It returns
// error when sample can't be appended. See
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t int64, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
@ -396,8 +400,11 @@ func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels
}
}
// Check if CT wouldn't be OOO vs samples we already might have for this series.
// NOTE(bwplotka): This will be often hit as it's expected for long living
// counters to share the same CT.
s.Lock()
isOOO, _, err := s.appendable(t, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
isOOO, _, err := s.appendable(ct, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
@ -405,20 +412,14 @@ func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels
if err != nil {
return 0, err
}
if isOOO {
return storage.SeriesRef(s.ref), storage.ErrCreatedTimestampOutOfOrder
return storage.SeriesRef(s.ref), storage.ErrOutOfOrderCT
}
if t > a.maxt {
a.maxt = t
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{
Ref: s.ref,
T: t,
V: 0.0,
})
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
a.sampleSeries = append(a.sampleSeries, s)
return storage.SeriesRef(s.ref), nil
}

Loading…
Cancel
Save