From 70be156e4ee91f56f7fe301f2ff4f29fa33d0ba8 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 12 Oct 2023 21:34:41 -0300 Subject: [PATCH] Append created timestamps. Signed-off-by: Arthur Silva Sens --- cmd/prometheus/main.go | 14 +- config/config.go | 6 +- scrape/helpers_test.go | 66 ++++++++++ scrape/manager.go | 2 + scrape/manager_test.go | 141 +++++++++++++++++++++ scrape/scrape.go | 17 +++ scrape/scrape_test.go | 5 +- storage/fanout.go | 14 ++ storage/interface.go | 15 +++ storage/remote/write.go | 5 + storage/remote/write_handler_test.go | 5 + tsdb/agent/db.go | 5 + tsdb/head_append.go | 94 +++++++++++--- tsdb/head_test.go | 183 +++++++++++++++++++++++++++ util/runutil/runutil.go | 37 ++++++ 15 files changed, 587 insertions(+), 22 deletions(-) create mode 100644 util/runutil/runutil.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4112cd842b..fa5d65e8e0 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -206,9 +206,15 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "native-histograms": c.tsdb.EnableNativeHistograms = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. - config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols - config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultNativeHistogramScrapeProtocols + config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols level.Info(logger).Log("msg", "Experimental native histogram support enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) + case "created-timestamp-ingestion": + c.scrape.EnableCreatedTimestampIngestion = true + // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. + config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols + level.Info(logger).Log("msg", "Experimental created timestamp ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "": continue case "promql-at-modifier", "promql-negative-offset": @@ -1448,6 +1454,10 @@ func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } diff --git a/config/config.go b/config/config.go index b832ac9a17..4039015feb 100644 --- a/config/config.go +++ b/config/config.go @@ -459,7 +459,11 @@ var ( OpenMetricsText0_0_1, PrometheusText0_0_4, } - DefaultNativeHistogramScrapeProtocols = []ScrapeProtocol{ + + // DefaultProtoFirstScrapeProtocols is the set of scrape protocols that favors protobuf + // Prometheus exposition format. Used by default for certain feature-flags like + // "native-histograms" and "created-timestamp-ingestion". + DefaultProtoFirstScrapeProtocols = []ScrapeProtocol{ PrometheusProto, OpenMetricsText1_0_0, OpenMetricsText0_0_1, diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index c580a50510..4a7ca1a05e 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -14,10 +14,18 @@ package scrape import ( + "bytes" "context" + "encoding/binary" "fmt" "math/rand" "strings" + "sync" + "testing" + + "github.com/gogo/protobuf/proto" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -50,6 +58,10 @@ func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M return 0, nil } +func (a nopAppender) AppendCreatedTimestamp(storage.SeriesRef, labels.Labels, int64) (storage.SeriesRef, error) { + return 0, nil +} + func (a nopAppender) Commit() error { return nil } func (a nopAppender) Rollback() error { return nil } @@ -65,9 +77,19 @@ type histogramSample struct { fh *histogram.FloatHistogram } +type collectResultAppendable struct { + *collectResultAppender +} + +func (a *collectResultAppendable) Appender(_ context.Context) storage.Appender { + return a +} + // collectResultAppender records all samples that were added through the appender. // It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { + mtx sync.Mutex + next storage.Appender resultFloats []floatSample pendingFloats []floatSample @@ -82,6 +104,8 @@ type collectResultAppender struct { } func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingFloats = append(a.pendingFloats, floatSample{ metric: lset, t: t, @@ -103,6 +127,8 @@ func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels } func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingExemplars = append(a.pendingExemplars, e) if a.next == nil { return 0, nil @@ -112,6 +138,8 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L } func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) if a.next == nil { return 0, nil @@ -121,6 +149,8 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels. } func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() a.pendingMetadata = append(a.pendingMetadata, m) if ref == 0 { ref = storage.SeriesRef(rand.Uint64()) @@ -132,7 +162,24 @@ func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.L return a.next.UpdateMetadata(ref, l, m) } +func (a *collectResultAppender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) { + a.mtx.Lock() + defer a.mtx.Unlock() + a.pendingFloats = append(a.pendingFloats, floatSample{ + metric: l, + t: t, + f: 0.0, + }) + + if ref == 0 { + ref = storage.SeriesRef(rand.Uint64()) + } + return ref, nil +} + func (a *collectResultAppender) Commit() error { + a.mtx.Lock() + defer a.mtx.Unlock() a.resultFloats = append(a.resultFloats, a.pendingFloats...) a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) @@ -148,6 +195,8 @@ func (a *collectResultAppender) Commit() error { } func (a *collectResultAppender) Rollback() error { + a.mtx.Lock() + defer a.mtx.Unlock() a.rolledbackFloats = a.pendingFloats a.rolledbackHistograms = a.pendingHistograms a.pendingFloats = nil @@ -171,3 +220,20 @@ func (a *collectResultAppender) String() string { } return sb.String() } + +// serializeMetricFamily serializes a MetricFamily into a byte slice. +// Needed because Prometheus has its own implementation of protobuf +// marshalling and unmarshalling that only supports 'encoding=delimited'. +// See also https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers +func serializeMetricFamily(t *testing.T, mf *dto.MetricFamily) []byte { + t.Helper() + buf := &bytes.Buffer{} + protoBuf, err := proto.Marshal(mf) + require.NoError(t, err) + + varintBuf := make([]byte, binary.MaxVarintLen32) + varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf))) + buf.Write(varintBuf[:varintLength]) + buf.Write(protoBuf) + return buf.Bytes() +} diff --git a/scrape/manager.go b/scrape/manager.go index 3b70e48a13..f3e585251f 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -78,6 +78,8 @@ type Options struct { EnableMetadataStorage bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable the ingestion of the created timestamp of a metric. + EnableCreatedTimestampIngestion bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption diff --git a/scrape/manager_test.go b/scrape/manager_test.go index a689c469d4..7837891bdb 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -15,14 +15,22 @@ package scrape import ( "context" + "errors" "net/http" + "net/http/httptest" + "net/url" + "os" "strconv" "testing" "time" + "github.com/go-kit/log" + "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/config" @@ -30,6 +38,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/util/runutil" ) func TestPopulateLabels(t *testing.T) { @@ -714,3 +723,135 @@ scrape_configs: reload(scrapeManager, cfg2) require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } + +func TestManagerScrapeCreatedTimestamp(t *testing.T) { + counterType := dto.MetricType_COUNTER + now := time.Now() + nowMs := now.UnixMilli() + + makeMfWithCT := func(ct time.Time) *dto.MetricFamily { + return &dto.MetricFamily{ + Name: proto.String("expected_counter"), + Type: &counterType, + Metric: []*dto.Metric{ + { + Counter: &dto.Counter{ + Value: proto.Float64(1.0), + CreatedTimestamp: timestamppb.New(ct), + }, + }, + }, + } + } + + for _, tc := range []struct { + name string + mf *dto.MetricFamily + ingestCT bool + expectedScrapedValues []float64 + }{ + { + name: "valid counter/Ingestion enabled", + mf: makeMfWithCT(now), + ingestCT: true, + expectedScrapedValues: []float64{0.0, 1.0}, + }, + { + name: "valid counter/Ingestion disabled", + mf: makeMfWithCT(now), + ingestCT: false, + expectedScrapedValues: []float64{1.0}, + }, + { + name: "created timestamp older than sample timestamp", + mf: func() *dto.MetricFamily { + mf := makeMfWithCT(now.Add(time.Hour)) + mf.Metric[0].TimestampMs = &nowMs + return mf + }(), + ingestCT: true, + expectedScrapedValues: []float64{1.0}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + app := &collectResultAppender{} + scrapeManager, err := NewManager( + &Options{EnableCreatedTimestampIngestion: tc.ingestCT}, + log.NewLogfmtLogger(os.Stderr), + &collectResultAppendable{app}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + ScrapeInterval: model.Duration(5 * time.Second), + ScrapeTimeout: model.Duration(5 * time.Second), + ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto}, + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + + // Start fake HTTP target to scrape returning a single metric. + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`) + w.Write(serializeMetricFamily(t, tc.mf)) + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + require.NoError(t, err) + + // Add fake target directly into tsets + reload. Normally users would use + // Manager.Run and wait for minimum 5s refresh interval. + scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + "test": { + { + Targets: []model.LabelSet{{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }}, + }, + }, + }) + scrapeManager.reload() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + if countFloatSamples(app, *tc.mf.Name) < 1 { + return errors.New("expected at least one sample") + } + return nil + }), "after 5 seconds") + scrapeManager.Stop() + + require.Equal(t, tc.expectedScrapedValues, getResultFloats(app, *tc.mf.Name)) + }) + } +} + +func countFloatSamples(a *collectResultAppender, expectedMetricName string) (count int) { + a.mtx.Lock() + defer a.mtx.Unlock() + + for _, f := range a.resultFloats { + if f.metric.Get(model.MetricNameLabel) == expectedMetricName { + count++ + } + } + return count +} + +func getResultFloats(app *collectResultAppender, expectedMetricName string) (result []float64) { + app.mtx.Lock() + defer app.mtx.Unlock() + + for _, f := range app.resultFloats { + if f.metric.Get(model.MetricNameLabel) == expectedMetricName { + result = append(result, f.f) + } + } + return result +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 9a0ba1d009..da60c7edae 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -31,6 +31,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/types" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -168,6 +169,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed opts.interval, opts.timeout, opts.scrapeClassicHistograms, + options.EnableCreatedTimestampIngestion, options.ExtraMetrics, options.EnableMetadataStorage, opts.target, @@ -787,6 +789,7 @@ type scrapeLoop struct { interval time.Duration timeout time.Duration scrapeClassicHistograms bool + scrapeCreatedTimestamps bool appender func(ctx context.Context) storage.Appender sampleMutator labelsMutator @@ -1076,6 +1079,7 @@ func newScrapeLoop(ctx context.Context, interval time.Duration, timeout time.Duration, scrapeClassicHistograms bool, + scrapeCreatedTimestamps bool, reportExtraMetrics bool, appendMetadataToWAL bool, target *Target, @@ -1124,6 +1128,7 @@ func newScrapeLoop(ctx context.Context, interval: interval, timeout: timeout, scrapeClassicHistograms: scrapeClassicHistograms, + scrapeCreatedTimestamps: scrapeCreatedTimestamps, reportExtraMetrics: reportExtraMetrics, appendMetadataToWAL: appendMetadataToWAL, metrics: metrics, @@ -1557,6 +1562,18 @@ loop: updateMetadata(lset, true) } + if sl.scrapeCreatedTimestamps { + var ct types.Timestamp + if p.CreatedTimestamp(&ct) { + if ctMs := (ct.Seconds * 1000) + int64(ct.Nanos/1_000_000); ctMs < t { + ref, err = app.AppendCreatedTimestamp(ref, lset, ctMs) + if err != nil { + level.Debug(sl.l).Log("msg", "created timestamp not ingested", "reason", err) + } + } + } + } + if isHistogram { if h != nil { ref, err = app.AppendHistogram(ref, lset, t, h, nil) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index a9719f9a07..167696590f 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -660,6 +660,7 @@ func newBasicScrapeLoop(t testing.TB, ctx context.Context, scraper scraper, app false, false, false, + false, nil, false, newTestScrapeMetrics(t), @@ -801,6 +802,7 @@ func TestScrapeLoopRun(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -945,6 +947,7 @@ func TestScrapeLoopMetadata(t *testing.T) { false, false, false, + false, nil, false, scrapeMetrics, @@ -2377,7 +2380,7 @@ func TestTargetScraperScrapeOK(t *testing.T) { runTest(acceptHeader(config.DefaultScrapeProtocols)) protobufParsing = true - runTest(acceptHeader(config.DefaultNativeHistogramScrapeProtocols)) + runTest(acceptHeader(config.DefaultProtoFirstScrapeProtocols)) } func TestTargetScrapeScrapeCancel(t *testing.T) { diff --git a/storage/fanout.go b/storage/fanout.go index 33257046f2..862cf2906d 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -202,6 +202,20 @@ func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metada return ref, nil } +func (f *fanoutAppender) AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) { + ref, err := f.primary.AppendCreatedTimestamp(ref, l, t) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendCreatedTimestamp(ref, l, t); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() diff --git a/storage/interface.go b/storage/interface.go index 2b1b6a63eb..8dc9916b48 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -237,6 +237,7 @@ type Appender interface { ExemplarAppender HistogramAppender MetadataUpdater + CreatedTimestampAppender } // GetRef is an extra interface on Appenders used by downstream projects @@ -294,6 +295,20 @@ type MetadataUpdater interface { UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) } +// CreatedTimestampAppender provides an interface for appending created timestamps to the storage. +type CreatedTimestampAppender interface { + // AppendCreatedTimestamp adds an extra sample to the given series labels. + // The value of the appended sample is always zero, while the sample's timestamp + // is the one exposed by the target as created timestamp. + // + // Appending created timestamps is optional, that is because appending sythetic zeros + // should only happen if created timestamp respects the order of the samples, i.e. is not out-of-order. + // + // When AppendCreatedTimestamp decides to not append a sample, it should return a warning that can be + // logged by the caller. + AppendCreatedTimestamp(ref SeriesRef, l labels.Labels, t int64) (SeriesRef, error) +} + // SeriesSet contains a set of series. type SeriesSet interface { Next() bool diff --git a/storage/remote/write.go b/storage/remote/write.go index 237f8caa91..4ad5379c43 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -303,6 +303,11 @@ func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, return 0, nil } +func (t *timestampTracker) AppendCreatedTimestamp(_ storage.SeriesRef, _ labels.Labels, _ int64) (storage.SeriesRef, error) { + // AppendCreatedTimestamp is no-op for remote-write for now. + return 0, nil +} + // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 27d0e9fabd..dfd1b324d8 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -339,3 +339,8 @@ func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. return 0, nil } + +func (m *mockAppendable) AppendCreatedTimestamp(_ storage.SeriesRef, _ labels.Labels, _ int64) (storage.SeriesRef, error) { + // AppendCreatedTimestamp is no-op for remote-write for now. + return 0, nil +} diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 66861a487c..b1aac489d8 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -954,6 +954,11 @@ func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Met return 0, nil } +// AppendCreatedTimestamp wasn't implemented for agent mode, yet. +func (a *appender) AppendCreatedTimestamp(ref storage.SeriesRef, l labels.Labels, t int64) (storage.SeriesRef, error) { + return 0, nil +} + // Commit submits the collected samples and purges the batch. func (a *appender) Commit() error { if err := a.log(); err != nil { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index be53a4f3f6..f1605721b3 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -87,6 +87,17 @@ func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m return a.app.UpdateMetadata(ref, l, m) } +func (a *initAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) { + if a.app != nil { + return a.app.AppendCreatedTimestamp(ref, lset, t) + } + + a.head.initTime(t) + a.app = a.head.appender() + + return a.app.AppendCreatedTimestamp(ref, lset, t) +} + // initTime initializes a head with the first timestamp. This only needs to be called // for a completely fresh head with an empty WAL. func (h *Head) initTime(t int64) { @@ -319,28 +330,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - if lset.IsEmpty() { - return 0, errors.Wrap(ErrInvalidSample, "empty labelset") - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) - } - - var created bool var err error - s, created, err = a.head.getOrCreate(lset.Hash(), lset) + s, err = a.getOrCreate(lset) if err != nil { return 0, err } - if created { - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } } if value.IsStaleNaN(v) { @@ -389,6 +383,70 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 return storage.SeriesRef(s.ref), nil } +// AppendCreatedTimestamp appends a sample with 0 as its value when it makes sense to do so. +// For instance, it's not safe or efficient to append out-of-order created +// timestamp (e.g. we don't know if we didn't append zero for this created timestamp already). +func (a *headAppender) AppendCreatedTimestamp(ref storage.SeriesRef, lset labels.Labels, t int64) (storage.SeriesRef, error) { + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s == nil { + var err error + s, err = a.getOrCreate(lset) + if err != nil { + return 0, err + } + } + + s.Lock() + isOOO, _, err := s.appendable(t, 0, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err == nil { + s.pendingCommit = true + } + s.Unlock() + if err != nil { + return 0, err + } + + if isOOO { + return storage.SeriesRef(s.ref), nil + } + + if t > a.maxt { + a.maxt = t + } + + a.samples = append(a.samples, record.RefSample{ + Ref: s.ref, + T: t, + V: 0.0, + }) + a.sampleSeries = append(a.sampleSeries, s) + return storage.SeriesRef(s.ref), nil +} + +func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if lset.IsEmpty() { + return nil, errors.Wrap(ErrInvalidSample, "empty labelset") + } + if l, dup := lset.HasDuplicateLabelNames(); dup { + return nil, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } + var created bool + var err error + s, created, err := a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return nil, err + } + if created { + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } + return s, nil +} + // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) // The sample belongs to the out of order chunk if we return true and no error. // An error signifies the sample cannot be handled. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fd8dd024ec..4852d1e8e7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5641,3 +5641,186 @@ func TestPostingsCardinalityStats(t *testing.T) { // Using cache. require.Equal(t, statsForSomeLabel1, head.PostingsCardinalityStats("n", 1)) } + +func TestAppendCreatedTimestamps(t *testing.T) { + testCases := []struct { + name string + appendFunc func(*testing.T, storage.Appender) + assertFunc func(*testing.T, storage.Querier) + }{ + { + name: "In order ct+normal sample", + appendFunc: func(t *testing.T, a storage.Appender) { + lbls := labels.FromStrings("foo", "bar") + ts := int64(100) + _, err := a.AppendCreatedTimestamp(0, lbls, ts-1) + require.NoError(t, err) + _, err = a.Append(0, lbls, ts, 10) + require.NoError(t, err) + require.NoError(t, a.Commit()) + }, + assertFunc: func(t *testing.T, q storage.Querier) { + ts := int64(100) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + s := ss.At() + + it := s.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value := it.At() + require.Equal(t, ts-1, timestamp) + require.Equal(t, 0.0, value) + + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, ts, timestamp) + require.Equal(t, 10.0, value) + + require.False(t, ss.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + }, + }, + { + name: "Consecutive appends with same ct ignore ct", + appendFunc: func(t *testing.T, a storage.Appender) { + lbls := labels.FromStrings("foo", "bar") + ctTs := int64(99) + sampleTs := int64(100) + _, err := a.AppendCreatedTimestamp(0, lbls, ctTs) + require.NoError(t, err) + _, err = a.Append(0, lbls, sampleTs, 10) + require.NoError(t, err) + sampleTs += 1 + _, err = a.AppendCreatedTimestamp(0, lbls, ctTs) + require.NoError(t, err) + _, err = a.Append(0, lbls, sampleTs, 10) + require.NoError(t, err) + require.NoError(t, a.Commit()) + }, + assertFunc: func(t *testing.T, q storage.Querier) { + ctTs := int64(99) + sampleTs := int64(100) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator(nil) + // First CT is ingested + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value := it.At() + require.Equal(t, ctTs, timestamp) + require.Equal(t, 0.0, value) + + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, sampleTs, timestamp) + require.Equal(t, 10.0, value) + + // On a consecutive scrape with the same CT, the CT is ignored + sampleTs += 1 + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, sampleTs, timestamp) + require.Equal(t, 10.0, value) + + require.False(t, ss.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + }, + }, + { + name: "Consecutive appends with newer ct do not ignore ct", + appendFunc: func(t *testing.T, a storage.Appender) { + lbls := labels.FromStrings("foo", "bar") + ctTs := int64(99) + sampleTs := int64(100) + _, err := a.AppendCreatedTimestamp(0, lbls, ctTs) + require.NoError(t, err) + _, err = a.Append(0, lbls, sampleTs, 10) + require.NoError(t, err) + ctTs = sampleTs + 1 + sampleTs = ctTs + 1 + _, err = a.AppendCreatedTimestamp(0, lbls, ctTs) + require.NoError(t, err) + _, err = a.Append(0, lbls, sampleTs, 10) + require.NoError(t, err) + require.NoError(t, a.Commit()) + }, + assertFunc: func(t *testing.T, q storage.Querier) { + ctTs := int64(99) + sampleTs := int64(100) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value := it.At() + require.Equal(t, ctTs, timestamp) + require.Equal(t, 0.0, value) + + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, sampleTs, timestamp) + require.Equal(t, 10.0, value) + + // Second CT is younger than previous sample, so it is not ignored + ctTs = sampleTs + 1 + sampleTs = ctTs + 1 + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, ctTs, timestamp) + require.Equal(t, 0.0, value) + + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value = it.At() + require.Equal(t, sampleTs, timestamp) + require.Equal(t, 10.0, value) + + require.False(t, ss.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + }, + }, + { + name: "CT equals to previous sample timestamp is ignored", + appendFunc: func(t *testing.T, a storage.Appender) { + lbls := labels.FromStrings("foo", "bar") + sampleTs := int64(100) + _, err := a.Append(0, lbls, sampleTs, 10) + require.NoError(t, err) + _, err = a.AppendCreatedTimestamp(0, lbls, sampleTs) + require.NoError(t, err) + require.NoError(t, a.Commit()) + }, + assertFunc: func(t *testing.T, q storage.Querier) { + sampleTs := int64(100) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + s := ss.At() + require.False(t, ss.Next()) + + it := s.Iterator(nil) + require.Equal(t, chunkenc.ValFloat, it.Next()) + timestamp, value := it.At() + require.Equal(t, sampleTs, timestamp) + require.Equal(t, 10.0, value) + + require.False(t, ss.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + }, + }, + } + + for _, tc := range testCases { + h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false) + defer func() { + require.NoError(t, h.Close()) + }() + a := h.Appender(context.Background()) + tc.appendFunc(t, a) + q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + tc.assertFunc(t, q) + } +} diff --git a/util/runutil/runutil.go b/util/runutil/runutil.go new file mode 100644 index 0000000000..5a77c332ba --- /dev/null +++ b/util/runutil/runutil.go @@ -0,0 +1,37 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copied from https://github.com/efficientgo/core/blob/a21078e2c723b69e05f95c65dbc5058712b4edd8/runutil/runutil.go#L39 +// and adjusted. + +package runutil + +import "time" + +// Retry executes f every interval seconds until timeout or no error is returned from f. +func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { + tick := time.NewTicker(interval) + defer tick.Stop() + + var err error + for { + if err = f(); err == nil { + return nil + } + select { + case <-stopc: + return err + case <-tick.C: + } + } +}