From ad367b504b0b088a197d759aa7f01caeeeb6ee50 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Fri, 12 Dec 2025 11:12:28 +0000 Subject: [PATCH] refactor(tsdb/agent)[PART3]: add AppenderV2 support to agent Signed-off-by: bwplotka --- cmd/prometheus/main.go | 6 +- storage/interface.go | 4 +- storage/interface_append.go | 4 +- tsdb/agent/db.go | 117 ++- tsdb/agent/db_append_v2.go | 1320 +++---------------------------- tsdb/agent/db_append_v2_test.go | 590 +++++--------- tsdb/agent/db_test.go | 92 +-- tsdb/head_append_v2.go | 7 +- 8 files changed, 439 insertions(+), 1701 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 53379dc940..e903b87beb 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -265,6 +265,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error { case "created-timestamp-zero-ingestion": c.scrape.EnableStartTimestampZeroIngestion = true c.web.STZeroIngestionEnabled = true + c.agent.EnableSTAsZeroSample = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols @@ -1409,6 +1410,7 @@ func main() { "MinWALTime", cfg.agent.MinWALTime, "MaxWALTime", cfg.agent.MaxWALTime, "OutOfOrderTimeWindow", cfg.agent.OutOfOrderTimeWindow, + "EnableSTAsZeroSample", cfg.agent.EnableSTAsZeroSample, ) localStorage.Set(db, 0) @@ -1947,7 +1949,8 @@ type agentOptions struct { TruncateFrequency model.Duration MinWALTime, MaxWALTime model.Duration NoLockfile bool - OutOfOrderTimeWindow int64 + OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove. + EnableSTAsZeroSample bool } func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options { @@ -1963,6 +1966,7 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)), NoLockfile: opts.NoLockfile, OutOfOrderTimeWindow: outOfOrderTimeWindow, + EnableSTAsZeroSample: opts.EnableSTAsZeroSample, } } diff --git a/storage/interface.go b/storage/interface.go index f7d7953de4..ae8bec033e 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -274,8 +274,8 @@ type AppendOptions struct { // // Operations on the Appender interface are not goroutine-safe. // -// The order of samples appended via the Appender is preserved within each -// series. I.e. samples are not reordered per timestamp, or by float/histogram +// The order of samples appended via the Appender is preserved within each series. +// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram // type. // // WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026). diff --git a/storage/interface_append.go b/storage/interface_append.go index 880e57f194..cc7045dbd5 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -103,8 +103,8 @@ var _ error = &AppendPartialError{} // // Operations on the AppenderV2 interface are not goroutine-safe. // -// The order of samples appended via the AppenderV2 is preserved within each -// series. I.e. samples are not reordered per timestamp, or by float/histogram +// The order of samples appended via the AppenderV2 is preserved within each series. +// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram // type. type AppenderV2 interface { AppenderTransaction diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 5c9774cd58..7b3e74f51a 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -84,6 +84,15 @@ type Options struct { // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. OutOfOrderTimeWindow int64 + + // EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag. + // If true, ST, if non-empty and earlier than sample timestamp, will be stored + // as a zero sample before the actual sample. + // + // The zero sample is best-effort, only debug log on failure is emitted. + // NOTE(bwplotka): This feature might be deprecated and removed once PROM-60 + // is implemented. + EnableSTAsZeroSample bool } // DefaultOptions used for the WAL storage. They are reasonable for setups using @@ -233,8 +242,9 @@ type DB struct { wal *wlog.WL locker *tsdbutil.DirLocker - appenderPool sync.Pool - bufPool sync.Pool + appenderPool sync.Pool + appenderV2Pool sync.Pool + bufPool sync.Pool // These pools are only used during WAL replay and are reset at the end. // NOTE: Adjust resetWALReplayResources() upon changes to the pools. @@ -303,12 +313,26 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str db.appenderPool.New = func() any { return &appender{ - DB: db, - pendingSeries: make([]record.RefSeries, 0, 100), - pendingSamples: make([]record.RefSample, 0, 100), - pendingHistograms: make([]record.RefHistogramSample, 0, 100), - pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), - pendingExamplars: make([]record.RefExemplar, 0, 10), + appenderBase: appenderBase{ + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingHistograms: make([]record.RefHistogramSample, 0, 100), + pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), + }, + } + } + db.appenderV2Pool.New = func() any { + return &appenderV2{ + appenderBase: appenderBase{ + DB: db, + pendingSeries: make([]record.RefSeries, 0, 100), + pendingSamples: make([]record.RefSample, 0, 100), + pendingHistograms: make([]record.RefHistogramSample, 0, 100), + pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), + pendingExamplars: make([]record.RefExemplar, 0, 10), + }, } } @@ -777,9 +801,8 @@ func (db *DB) Close() error { return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err() } -type appender struct { +type appenderBase struct { *DB - hints *storage.AppendOptions pendingSeries []record.RefSeries pendingSamples []record.RefSample @@ -800,6 +823,12 @@ type appender struct { floatHistogramSeries []*memSeries } +type appender struct { + appenderBase + + hints *storage.AppendOptions +} + func (a *appender) SetOptions(opts *storage.AppendOptions) { a.hints = opts } @@ -853,7 +882,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo return storage.SeriesRef(series.ref), nil } -func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) { +func (a *appenderBase) getOrCreate(l labels.Labels) (series *memSeries, created bool) { hash := l.Hash() series = a.series.GetByHash(hash, l) @@ -879,45 +908,51 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem // Ensure no empty labels have gotten through. e.Labels = e.Labels.WithoutEmpty() + if err := a.validateExemplar(s.ref, e); err != nil { + if errors.Is(err, storage.ErrDuplicateExemplar) { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + return 0, err + } + + a.series.SetLatestExemplar(s.ref, &e) + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ + Ref: s.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + a.metrics.totalAppendedExemplars.Inc() + return storage.SeriesRef(s.ref), nil +} + +func (a *appenderBase) validateExemplar(ref chunks.HeadSeriesRef, e exemplar.Exemplar) error { if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar) + return fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar) } // Exemplar label length does not include chars involved in text rendering such as quotes // equals sign, or commas. See definition of const ExemplarMaxLabelLength. labelSetLen := 0 - err := e.Labels.Validate(func(l labels.Label) error { + if err := e.Labels.Validate(func(l labels.Label) error { labelSetLen += utf8.RuneCountInString(l.Name) labelSetLen += utf8.RuneCountInString(l.Value) - if labelSetLen > exemplar.ExemplarMaxLabelSetLength { return storage.ErrExemplarLabelLength } return nil - }) - if err != nil { - return 0, err + }); err != nil { + return err } - // Check for duplicate vs last stored exemplar for this series, and discard those. // Otherwise, record the current exemplar as the latest. // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. - prevExemplar := a.series.GetLatestExemplar(s.ref) + prevExemplar := a.series.GetLatestExemplar(ref) if prevExemplar != nil && prevExemplar.Equals(e) { - // Duplicate, don't return an error but don't accept the exemplar. - return 0, nil + return storage.ErrDuplicateExemplar } - a.series.SetLatestExemplar(s.ref, &e) - - a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ - Ref: s.ref, - T: e.Ts, - V: e.Value, - Labels: e.Labels, - }) - - a.metrics.totalAppendedExemplars.Inc() - return storage.SeriesRef(s.ref), nil + return nil } func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { @@ -1046,6 +1081,9 @@ func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.L // discard the sample if it's out of order. return 0, storage.ErrOutOfOrderST } + // NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done + // to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal + // of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632. series.lastTs = st switch { @@ -1110,6 +1148,9 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, // discard the sample if it's out of order. return 0, storage.ErrOutOfOrderST } + // NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done + // to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal + // of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632. series.lastTs = st // NOTE: always modify pendingSamples and sampleSeries together. @@ -1126,7 +1167,7 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, } // Commit submits the collected samples and purges the batch. -func (a *appender) Commit() error { +func (a *appenderBase) Commit() error { if err := a.log(); err != nil { return err } @@ -1141,7 +1182,7 @@ func (a *appender) Commit() error { } // log logs all pending data to the WAL. -func (a *appender) log() error { +func (a *appenderBase) log() error { a.mtx.RLock() defer a.mtx.RUnlock() @@ -1235,7 +1276,7 @@ func (a *appender) log() error { } // clearData clears all pending data. -func (a *appender) clearData() { +func (a *appenderBase) clearData() { a.pendingSeries = a.pendingSeries[:0] a.pendingSamples = a.pendingSamples[:0] a.pendingHistograms = a.pendingHistograms[:0] @@ -1246,7 +1287,7 @@ func (a *appender) clearData() { a.floatHistogramSeries = a.floatHistogramSeries[:0] } -func (a *appender) Rollback() error { +func (a *appenderBase) Rollback() error { // Series are created in-memory regardless of rollback. This means we must // log them to the WAL, otherwise subsequent commits may reference a series // which was never written to the WAL. @@ -1260,7 +1301,7 @@ func (a *appender) Rollback() error { } // logSeries logs only pending series records to the WAL. -func (a *appender) logSeries() error { +func (a *appenderBase) logSeries() error { a.mtx.RLock() defer a.mtx.RUnlock() @@ -1283,7 +1324,7 @@ func (a *appender) logSeries() error { // minValidTime returns the minimum timestamp that a sample can have // and is needed for preventing underflow. -func (a *appender) minValidTime(lastTs int64) int64 { +func (a *appenderBase) minValidTime(lastTs int64) int64 { if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow { return math.MinInt64 } diff --git a/tsdb/agent/db_append_v2.go b/tsdb/agent/db_append_v2.go index 5c9774cd58..ae4e3a4a84 100644 --- a/tsdb/agent/db_append_v2.go +++ b/tsdb/agent/db_append_v2.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Prometheus Authors +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,799 +17,51 @@ import ( "context" "errors" "fmt" - "log/slog" - "math" - "path/filepath" - "sync" - "time" - "unicode/utf8" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "go.uber.org/atomic" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/record" - "github.com/prometheus/prometheus/tsdb/tsdbutil" - "github.com/prometheus/prometheus/tsdb/wlog" - "github.com/prometheus/prometheus/util/compression" - "github.com/prometheus/prometheus/util/zeropool" -) - -const ( - sampleMetricTypeFloat = "float" - sampleMetricTypeHistogram = "histogram" ) -var ErrUnsupported = errors.New("unsupported operation with WAL-only storage") - -// Default values for options. -var ( - DefaultTruncateFrequency = 2 * time.Hour - DefaultMinWALTime = int64(5 * time.Minute / time.Millisecond) - DefaultMaxWALTime = int64(4 * time.Hour / time.Millisecond) -) - -// Options of the WAL storage. -type Options struct { - // Segments (wal files) max size. - // WALSegmentSize <= 0, segment size is default size. - // WALSegmentSize > 0, segment size is WALSegmentSize. - WALSegmentSize int - - // WALCompression configures the compression type to use on records in the WAL. - WALCompression compression.Type - - // StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance. - StripeSize int - - // TruncateFrequency determines how frequently to truncate data from the WAL. - TruncateFrequency time.Duration - - // Shortest and longest amount of time data can exist in the WAL before being - // deleted. - MinWALTime, MaxWALTime int64 - - // NoLockfile disables creation and consideration of a lock file. - NoLockfile bool - - // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. - OutOfOrderTimeWindow int64 -} - -// DefaultOptions used for the WAL storage. They are reasonable for setups using -// millisecond-precision timestamps. -func DefaultOptions() *Options { - return &Options{ - WALSegmentSize: wlog.DefaultSegmentSize, - WALCompression: compression.None, - StripeSize: tsdb.DefaultStripeSize, - TruncateFrequency: DefaultTruncateFrequency, - MinWALTime: DefaultMinWALTime, - MaxWALTime: DefaultMaxWALTime, - NoLockfile: false, - OutOfOrderTimeWindow: 0, - } -} - -type dbMetrics struct { - r prometheus.Registerer - - numActiveSeries prometheus.Gauge - numWALSeriesPendingDeletion prometheus.Gauge - totalAppendedSamples *prometheus.CounterVec - totalAppendedExemplars prometheus.Counter - totalOutOfOrderSamples prometheus.Counter - walTruncateDuration prometheus.Summary - walCorruptionsTotal prometheus.Counter - walTotalReplayDuration prometheus.Gauge - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter -} - -func newDBMetrics(r prometheus.Registerer) *dbMetrics { - m := dbMetrics{r: r} - m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_agent_active_series", - Help: "Number of active series being tracked by the WAL storage", - }) - - m.numWALSeriesPendingDeletion = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_agent_deleted_series", - Help: "Number of series pending deletion from the WAL", - }) - - m.totalAppendedSamples = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_agent_samples_appended_total", - Help: "Total number of samples appended to the storage", - }, []string{"type"}) - - m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_exemplars_appended_total", - Help: "Total number of exemplars appended to the storage", - }) - - m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_out_of_order_samples_total", - Help: "Total number of out of order samples ingestion failed attempts.", - }) - - m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_agent_truncate_duration_seconds", - Help: "Duration of WAL truncation.", - }) - - m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_corruptions_total", - Help: "Total number of WAL corruptions.", - }) - - m.walTotalReplayDuration = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_agent_data_replay_duration_seconds", - Help: "Time taken to replay the data on disk.", - }) - - m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_checkpoint_deletions_failed_total", - Help: "Total number of checkpoint deletions that failed.", - }) - - m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_checkpoint_deletions_total", - Help: "Total number of checkpoint deletions attempted.", - }) - - m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_checkpoint_creations_failed_total", - Help: "Total number of checkpoint creations that failed.", - }) - - m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_agent_checkpoint_creations_total", - Help: "Total number of checkpoint creations attempted.", - }) - - if r != nil { - r.MustRegister( - m.numActiveSeries, - m.numWALSeriesPendingDeletion, - m.totalAppendedSamples, - m.totalAppendedExemplars, - m.totalOutOfOrderSamples, - m.walTruncateDuration, - m.walCorruptionsTotal, - m.walTotalReplayDuration, - m.checkpointDeleteFail, - m.checkpointDeleteTotal, - m.checkpointCreationFail, - m.checkpointCreationTotal, - ) - } - - return &m -} - -func (m *dbMetrics) Unregister() { - if m.r == nil { - return - } - cs := []prometheus.Collector{ - m.numActiveSeries, - m.numWALSeriesPendingDeletion, - m.totalAppendedSamples, - m.totalAppendedExemplars, - m.totalOutOfOrderSamples, - m.walTruncateDuration, - m.walCorruptionsTotal, - m.walTotalReplayDuration, - m.checkpointDeleteFail, - m.checkpointDeleteTotal, - m.checkpointCreationFail, - m.checkpointCreationTotal, - } - for _, c := range cs { - m.r.Unregister(c) - } -} - -// DB represents a WAL-only storage. It implements storage.DB. -type DB struct { - mtx sync.RWMutex - logger *slog.Logger - opts *Options - rs *remote.Storage - - wal *wlog.WL - locker *tsdbutil.DirLocker - - appenderPool sync.Pool - bufPool sync.Pool - - // These pools are only used during WAL replay and are reset at the end. - // NOTE: Adjust resetWALReplayResources() upon changes to the pools. - walReplaySeriesPool zeropool.Pool[[]record.RefSeries] - walReplaySamplesPool zeropool.Pool[[]record.RefSample] - walReplayHistogramsPool zeropool.Pool[[]record.RefHistogramSample] - walReplayFloatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] - - nextRef *atomic.Uint64 - series *stripeSeries - // deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they - // must be kept around to). - deleted map[chunks.HeadSeriesRef]int - - donec chan struct{} - stopc chan struct{} - - writeNotified wlog.WriteNotified - - metrics *dbMetrics -} - -// Open returns a new agent.DB in the given directory. -func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) { - opts = validateOptions(opts) - - locker, err := tsdbutil.NewDirLocker(dir, "agent", l, reg) - if err != nil { - return nil, err - } - if !opts.NoLockfile { - if err := locker.Lock(); err != nil { - return nil, err - } - } - - // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. - dir = filepath.Join(dir, "wal") - - w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) - if err != nil { - return nil, fmt.Errorf("creating WAL: %w", err) - } - - db := &DB{ - logger: l, - opts: opts, - rs: rs, - - wal: w, - locker: locker, - - nextRef: atomic.NewUint64(0), - series: newStripeSeries(opts.StripeSize), - deleted: make(map[chunks.HeadSeriesRef]int), - - donec: make(chan struct{}), - stopc: make(chan struct{}), - - metrics: newDBMetrics(reg), - } - - db.bufPool.New = func() any { - return make([]byte, 0, 1024) - } - - db.appenderPool.New = func() any { - return &appender{ - DB: db, - pendingSeries: make([]record.RefSeries, 0, 100), - pendingSamples: make([]record.RefSample, 0, 100), - pendingHistograms: make([]record.RefHistogramSample, 0, 100), - pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), - pendingExamplars: make([]record.RefExemplar, 0, 10), - } - } - - if err := db.replayWAL(); err != nil { - db.logger.Warn("encountered WAL read error, attempting repair", "err", err) - if err := w.Repair(err); err != nil { - return nil, fmt.Errorf("repair corrupted WAL: %w", err) - } - db.logger.Info("successfully repaired WAL") - } - - go db.run() - return db, nil -} - -// SetWriteNotified allows to set an instance to notify when a write happens. -// It must be used during initialization. It is not safe to use it during execution. -func (db *DB) SetWriteNotified(wn wlog.WriteNotified) { - db.writeNotified = wn -} - -func validateOptions(opts *Options) *Options { - if opts == nil { - opts = DefaultOptions() - } - if opts.WALSegmentSize <= 0 { - opts.WALSegmentSize = wlog.DefaultSegmentSize - } - - if opts.WALCompression == "" { - opts.WALCompression = compression.None - } - - // Revert StripeSize to DefaultStripeSize if StripeSize is either 0 or not a power of 2. - if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) { - opts.StripeSize = tsdb.DefaultStripeSize - } - if opts.TruncateFrequency <= 0 { - opts.TruncateFrequency = DefaultTruncateFrequency - } - if opts.MinWALTime <= 0 { - opts.MinWALTime = DefaultMinWALTime - } - if opts.MaxWALTime <= 0 { - opts.MaxWALTime = DefaultMaxWALTime - } - if opts.MinWALTime > opts.MaxWALTime { - opts.MaxWALTime = opts.MinWALTime - } - - if t := int64(opts.TruncateFrequency / time.Millisecond); opts.MaxWALTime < t { - opts.MaxWALTime = t - } - return opts -} - -func (db *DB) replayWAL() error { - db.logger.Info("replaying WAL, this may take a while", "dir", db.wal.Dir()) - defer db.resetWALReplayResources() - start := time.Now() - - dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir()) - if err != nil && !errors.Is(err, record.ErrNotFound) { - return fmt.Errorf("find last checkpoint: %w", err) - } - - multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} - - if err == nil { - sr, err := wlog.NewSegmentsReader(dir) - if err != nil { - return fmt.Errorf("open checkpoint: %w", err) - } - defer func() { - if err := sr.Close(); err != nil { - db.logger.Warn("error while closing the wal segments reader", "err", err) - } - }() - - // A corrupted checkpoint is a hard error for now and requires user - // intervention. There's likely little data that can be recovered anyway. - if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil { - return fmt.Errorf("backfill checkpoint: %w", err) - } - startFrom++ - db.logger.Info("WAL checkpoint loaded") - } - - // Find the last segment. - _, last, err := wlog.Segments(db.wal.Dir()) - if err != nil { - return fmt.Errorf("finding WAL segments: %w", err) - } - - // Backfill segments from the most recent checkpoint onwards. - for i := startFrom; i <= last; i++ { - seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i)) - if err != nil { - return fmt.Errorf("open WAL segment: %d: %w", i, err) - } - - sr := wlog.NewSegmentBufReader(seg) - err = db.loadWAL(wlog.NewReader(sr), multiRef) - if err := sr.Close(); err != nil { - db.logger.Warn("error while closing the wal segments reader", "err", err) - } - if err != nil { - return err - } - db.logger.Info("WAL segment loaded", "segment", i, "maxSegment", last) - } - - walReplayDuration := time.Since(start) - db.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) - - return nil +// AppenderV2 implements storage.AppenderV2. +func (db *DB) AppenderV2(context.Context) storage.AppenderV2 { + return db.appenderV2Pool.Get().(storage.AppenderV2) } -func (db *DB) resetWALReplayResources() { - db.walReplaySeriesPool = zeropool.Pool[[]record.RefSeries]{} - db.walReplaySamplesPool = zeropool.Pool[[]record.RefSample]{} - db.walReplayHistogramsPool = zeropool.Pool[[]record.RefHistogramSample]{} - db.walReplayFloatHistogramsPool = zeropool.Pool[[]record.RefFloatHistogramSample]{} +type appenderV2 struct { + appenderBase } -func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { +// Append appends pending sample to agent's DB. +// TODO: Wire metadata in the Agent's appender. +func (a *appenderV2) Append(ref storage.SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { var ( - syms = labels.NewSymbolTable() // One table for the whole WAL. - dec = record.NewDecoder(syms, db.logger) - lastRef = chunks.HeadSeriesRef(db.nextRef.Load()) - - decoded = make(chan any, 10) - errCh = make(chan error, 1) + // Avoid shadowing err variables for reliability. + valErr, partialErr error + sampleMetricType = sampleMetricTypeFloat + isStale bool ) - - go func() { - defer close(decoded) - var err error - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series := db.walReplaySeriesPool.Get()[:0] - series, err = dec.Series(rec, series) - if err != nil { - errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode series: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- series - case record.Samples: - samples := db.walReplaySamplesPool.Get()[:0] - samples, err = dec.Samples(rec, samples) - if err != nil { - errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- samples - case record.HistogramSamples, record.CustomBucketsHistogramSamples: - histograms := db.walReplayHistogramsPool.Get()[:0] - histograms, err = dec.HistogramSamples(rec, histograms) - if err != nil { - errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode histogram samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- histograms - case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples: - floatHistograms := db.walReplayFloatHistogramsPool.Get()[:0] - floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms) - if err != nil { - errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("decode float histogram samples: %w", err), - Segment: r.Segment(), - Offset: r.Offset(), - } - return - } - decoded <- floatHistograms - case record.Tombstones, record.Exemplars: - // We don't care about tombstones or exemplars during replay. - // TODO: If decide to decode exemplars, we should make sure to prepopulate - // stripeSeries.exemplars in the next block by using setLatestExemplar. - continue - default: - errCh <- &wlog.CorruptionErr{ - Err: fmt.Errorf("invalid record type %v", dec.Type(rec)), - Segment: r.Segment(), - Offset: r.Offset(), - } - } - } - }() - - var nonExistentSeriesRefs atomic.Uint64 - - for d := range decoded { - switch v := d.(type) { - case []record.RefSeries: - for _, entry := range v { - // If this is a new series, create it in memory. If we never read in a - // sample for this series, its timestamp will remain at 0 and it will - // be deleted at the next GC. - if db.series.GetByID(entry.Ref) == nil { - series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0} - db.series.Set(entry.Labels.Hash(), series) - multiRef[entry.Ref] = series.ref - db.metrics.numActiveSeries.Inc() - if entry.Ref > lastRef { - lastRef = entry.Ref - } - } - } - db.walReplaySeriesPool.Put(v) - case []record.RefSample: - for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { - nonExistentSeriesRefs.Inc() - continue - } - series := db.series.GetByID(ref) - if entry.T > series.lastTs { - series.lastTs = entry.T - } - } - db.walReplaySamplesPool.Put(v) - case []record.RefHistogramSample: - for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { - nonExistentSeriesRefs.Inc() - continue - } - series := db.series.GetByID(ref) - if entry.T > series.lastTs { - series.lastTs = entry.T - } - } - db.walReplayHistogramsPool.Put(v) - case []record.RefFloatHistogramSample: - for _, entry := range v { - // Update the lastTs for the series based - ref, ok := multiRef[entry.Ref] - if !ok { - nonExistentSeriesRefs.Inc() - continue - } - series := db.series.GetByID(ref) - if entry.T > series.lastTs { - series.lastTs = entry.T - } - } - db.walReplayFloatHistogramsPool.Put(v) - default: - panic(fmt.Errorf("unexpected decoded type: %T", d)) - } - } - - if v := nonExistentSeriesRefs.Load(); v > 0 { - db.logger.Warn("found sample referencing non-existing series", "skipped_series", v) - } - - db.nextRef.Store(uint64(lastRef)) - - select { - case err := <-errCh: - return err - default: - if r.Err() != nil { - return fmt.Errorf("read records: %w", r.Err()) - } - return nil - } -} - -func (db *DB) run() { - defer close(db.donec) - -Loop: - for { - select { - case <-db.stopc: - break Loop - case <-time.After(db.opts.TruncateFrequency): - // The timestamp ts is used to determine which series are not receiving - // samples and may be deleted from the WAL. Their most recent append - // timestamp is compared to ts, and if that timestamp is older then ts, - // they are considered inactive and may be deleted. - // - // Subtracting a duration from ts will add a buffer for when series are - // considered inactive and safe for deletion. - ts := max(db.rs.LowestSentTimestamp()-db.opts.MinWALTime, 0) - - // Network issues can prevent the result of getRemoteWriteTimestamp from - // changing. We don't want data in the WAL to grow forever, so we set a cap - // on the maximum age data can be. If our ts is older than this cutoff point, - // we'll shift it forward to start deleting very stale data. - if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS { - ts = maxTS - } - - db.logger.Debug("truncating the WAL", "ts", ts) - if err := db.truncate(ts); err != nil { - db.logger.Warn("failed to truncate WAL", "err", err) - } - } - } -} - -// keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint. -// last is the last WAL segment that was considered for checkpointing. -// NOTE: the agent implementation here is different from the Prometheus implementation, in that it uses WAL segment numbers instead of timestamps. -func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool { - return func(id chunks.HeadSeriesRef) bool { - // Keep the record if the series exists in the db. - if db.series.GetByID(id) != nil { - return true - } - - // Keep the record if the series was recently deleted. - seg, ok := db.deleted[id] - return ok && seg > last - } -} - -func (db *DB) truncate(mint int64) error { - db.logger.Info("series GC started") - db.mtx.RLock() - defer db.mtx.RUnlock() - - start := time.Now() - - db.gc(mint) - db.logger.Info("series GC completed", "duration", time.Since(start)) - - first, last, err := wlog.Segments(db.wal.Dir()) - if err != nil { - return fmt.Errorf("get segment range: %w", err) - } - - // Start a new segment so low ingestion volume instances don't have more WAL - // than needed. - if _, err := db.wal.NextSegment(); err != nil { - return fmt.Errorf("next segment: %w", err) - } - - last-- // Never consider most recent segment for checkpoint - if last < 0 { - return nil // no segments yet - } - - // The lower two-thirds of segments should contain mostly obsolete samples. - // If we have less than two segments, it's not worth checkpointing yet. - last = first + (last-first)*2/3 - if last <= first { - return nil - } - - db.metrics.checkpointCreationTotal.Inc() - - if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint); err != nil { - db.metrics.checkpointCreationFail.Inc() - var cerr *wlog.CorruptionErr - if errors.As(err, &cerr) { - db.metrics.walCorruptionsTotal.Inc() - } - return fmt.Errorf("create checkpoint: %w", err) - } - if err := db.wal.Truncate(last + 1); err != nil { - // If truncating fails, we'll just try it again at the next checkpoint. - // Leftover segments will still just be ignored in the future if there's a - // checkpoint that supersedes them. - db.logger.Error("truncating segments failed", "err", err) - } - - // The checkpoint is written and segments before it are truncated, so we - // no longer need to track deleted series that were being kept around. - for ref, segment := range db.deleted { - if segment <= last { - delete(db.deleted, ref) - } + // Fail fast on incorrect histograms. + switch { + case fh != nil: + sampleMetricType = sampleMetricTypeHistogram + valErr = fh.Validate() + case h != nil: + sampleMetricType = sampleMetricTypeHistogram + valErr = h.Validate() } - db.metrics.checkpointDeleteTotal.Inc() - db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) - - if err := wlog.DeleteCheckpoints(db.wal.Dir(), last); err != nil { - // Leftover old checkpoints do not cause problems down the line beyond - // occupying disk space. They will just be ignored since a newer checkpoint - // exists. - db.logger.Error("delete old checkpoints", "err", err) - db.metrics.checkpointDeleteFail.Inc() + if valErr != nil { + return 0, valErr } - db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) - - db.logger.Info("WAL checkpoint complete", "first", first, "last", last, "duration", time.Since(start)) - return nil -} - -// gc marks ref IDs that have not received a sample since mint as deleted in -// s.deleted, along with the segment where they originally got deleted. -func (db *DB) gc(mint int64) { - deleted := db.series.GC(mint) - db.metrics.numActiveSeries.Sub(float64(len(deleted))) - - _, last, _ := wlog.Segments(db.wal.Dir()) - - // We want to keep series records for any newly deleted series - // until we've passed the last recorded segment. This prevents - // the WAL having samples for series records that no longer exist. - for ref := range deleted { - db.deleted[ref] = last - } - - db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) -} - -// StartTime implements the Storage interface. -func (*DB) StartTime() (int64, error) { - return int64(model.Latest), nil -} - -// Querier implements the Storage interface. -func (*DB) Querier(int64, int64) (storage.Querier, error) { - return nil, ErrUnsupported -} - -// ChunkQuerier implements the Storage interface. -func (*DB) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) { - return nil, ErrUnsupported -} - -// ExemplarQuerier implements the Storage interface. -func (*DB) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { - return nil, ErrUnsupported -} - -// Appender implements storage.Storage. -func (db *DB) Appender(context.Context) storage.Appender { - return db.appenderPool.Get().(storage.Appender) -} - -// Close implements the Storage interface. -func (db *DB) Close() error { - db.mtx.Lock() - defer db.mtx.Unlock() - - close(db.stopc) - <-db.donec - - db.metrics.Unregister() - - return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err() -} - -type appender struct { - *DB - hints *storage.AppendOptions - - pendingSeries []record.RefSeries - pendingSamples []record.RefSample - pendingHistograms []record.RefHistogramSample - pendingFloatHistograms []record.RefFloatHistogramSample - pendingExamplars []record.RefExemplar - - // Pointers to the series referenced by each element of pendingSamples. - // Series lock is not held on elements. - sampleSeries []*memSeries - - // Pointers to the series referenced by each element of pendingHistograms. - // Series lock is not held on elements. - histogramSeries []*memSeries - - // Pointers to the series referenced by each element of pendingFloatHistograms. - // Series lock is not held on elements. - floatHistogramSeries []*memSeries -} - -func (a *appender) SetOptions(opts *storage.AppendOptions) { - a.hints = opts -} - -func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { // series references and chunk references are identical for agent mode. - headRef := chunks.HeadSeriesRef(ref) - - series := a.series.GetByID(headRef) - if series == nil { + s := a.series.GetByID(chunks.HeadSeriesRef(ref)) + if s == nil { // Ensure no empty or duplicate labels have gotten through. This mirrors the // equivalent validation code in the TSDB's headAppender. l = l.WithoutEmpty() @@ -822,10 +74,10 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo } var created bool - series, created = a.getOrCreate(l) + s, created = a.getOrCreate(l) if created { a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: series.ref, + Ref: s.ref, Labels: l, }) @@ -833,460 +85,140 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo } } - series.Lock() - defer series.Unlock() - - if t <= a.minValidTime(series.lastTs) { - a.metrics.totalOutOfOrderSamples.Inc() - return 0, storage.ErrOutOfOrderSample - } - - // NOTE: always modify pendingSamples and sampleSeries together. - a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: series.ref, - T: t, - V: v, - }) - a.sampleSeries = append(a.sampleSeries, series) - - a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() - return storage.SeriesRef(series.ref), nil -} - -func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) { - hash := l.Hash() - - series = a.series.GetByHash(hash, l) - if series != nil { - return series, false - } - - ref := chunks.HeadSeriesRef(a.nextRef.Inc()) - series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64} - a.series.Set(hash, series) - return series, true -} - -func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - // Series references and chunk references are identical for agent mode. - headRef := chunks.HeadSeriesRef(ref) - - s := a.series.GetByID(headRef) - if s == nil { - return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref) - } - - // Ensure no empty labels have gotten through. - e.Labels = e.Labels.WithoutEmpty() - - if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar) - } - - // Exemplar label length does not include chars involved in text rendering such as quotes - // equals sign, or commas. See definition of const ExemplarMaxLabelLength. - labelSetLen := 0 - err := e.Labels.Validate(func(l labels.Label) error { - labelSetLen += utf8.RuneCountInString(l.Name) - labelSetLen += utf8.RuneCountInString(l.Value) - - if labelSetLen > exemplar.ExemplarMaxLabelSetLength { - return storage.ErrExemplarLabelLength - } - return nil - }) - if err != nil { - return 0, err - } - - // Check for duplicate vs last stored exemplar for this series, and discard those. - // Otherwise, record the current exemplar as the latest. - // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. - prevExemplar := a.series.GetLatestExemplar(s.ref) - if prevExemplar != nil && prevExemplar.Equals(e) { - // Duplicate, don't return an error but don't accept the exemplar. - return 0, nil - } - a.series.SetLatestExemplar(s.ref, &e) - - a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ - Ref: s.ref, - T: e.Ts, - V: e.Value, - Labels: e.Labels, - }) + s.Lock() + lastTS := s.lastTs + s.Unlock() - a.metrics.totalAppendedExemplars.Inc() - return storage.SeriesRef(s.ref), nil -} - -func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - if h != nil { - if err := h.Validate(); err != nil { - return 0, err - } - } - - if fh != nil { - if err := fh.Validate(); err != nil { - return 0, err - } + // TODO(bwplotka): Handle ST natively (as per PROM-60). + if a.opts.EnableSTAsZeroSample && st != 0 { + a.bestEffortAppendSTZeroSample(s, lastTS, st, t, h, fh) } - // series references and chunk references are identical for agent mode. - headRef := chunks.HeadSeriesRef(ref) - - series := a.series.GetByID(headRef) - if series == nil { - // Ensure no empty or duplicate labels have gotten through. This mirrors the - // equivalent validation code in the TSDB's headAppender. - l = l.WithoutEmpty() - if l.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) - } - - if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) - } - - var created bool - series, created = a.getOrCreate(l) - if created { - a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: series.ref, - Labels: l, - }) - - a.metrics.numActiveSeries.Inc() - } - } - - series.Lock() - defer series.Unlock() - - if t <= a.minValidTime(series.lastTs) { + if t <= a.minValidTime(lastTS) { a.metrics.totalOutOfOrderSamples.Inc() return 0, storage.ErrOutOfOrderSample } switch { - case h != nil: - // NOTE: always modify pendingHistograms and histogramSeries together - a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ - Ref: series.ref, - T: t, - H: h, - }) - a.histogramSeries = append(a.histogramSeries, series) case fh != nil: + isStale = value.IsStaleNaN(fh.Sum) // NOTE: always modify pendingFloatHistograms and floatHistogramSeries together a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ - Ref: series.ref, + Ref: s.ref, T: t, FH: fh, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, series) - } - - a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() - return storage.SeriesRef(series.ref), nil -} - -func (*appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { - // TODO: Wire metadata in the Agent's appender. - return 0, nil -} - -func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - if h != nil { - if err := h.Validate(); err != nil { - return 0, err - } - } - if fh != nil { - if err := fh.Validate(); err != nil { - return 0, err - } - } - if st >= t { - return 0, storage.ErrSTNewerThanSample - } - - series := a.series.GetByID(chunks.HeadSeriesRef(ref)) - if series == nil { - // Ensure no empty labels have gotten through. - l = l.WithoutEmpty() - if l.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) - } - - if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) - } - - var created bool - series, created = a.getOrCreate(l) - if created { - a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: series.ref, - Labels: l, - }) - a.metrics.numActiveSeries.Inc() - } - } - - series.Lock() - defer series.Unlock() - - if st <= a.minValidTime(series.lastTs) { - return 0, storage.ErrOutOfOrderST - } - - if st <= series.lastTs { - // discard the sample if it's out of order. - return 0, storage.ErrOutOfOrderST - } - series.lastTs = st - - switch { + a.floatHistogramSeries = append(a.floatHistogramSeries, s) case h != nil: - zeroHistogram := &histogram.Histogram{} + isStale = value.IsStaleNaN(h.Sum) + // NOTE: always modify pendingHistograms and histogramSeries together a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{ - Ref: series.ref, - T: st, - H: zeroHistogram, - }) - a.histogramSeries = append(a.histogramSeries, series) - case fh != nil: - a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{ - Ref: series.ref, - T: st, - FH: &histogram.FloatHistogram{}, + Ref: s.ref, + T: t, + H: h, }) - a.floatHistogramSeries = append(a.floatHistogramSeries, series) - } - - a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() - return storage.SeriesRef(series.ref), nil -} - -func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t, st int64) (storage.SeriesRef, error) { - if st >= t { - return 0, storage.ErrSTNewerThanSample - } - - series := a.series.GetByID(chunks.HeadSeriesRef(ref)) - if series == nil { - l = l.WithoutEmpty() - if l.IsEmpty() { - return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample) - } - - if lbl, dup := l.HasDuplicateLabelNames(); dup { - return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample) - } - - newSeries, created := a.getOrCreate(l) - if created { - a.pendingSeries = append(a.pendingSeries, record.RefSeries{ - Ref: newSeries.ref, - Labels: l, - }) - a.metrics.numActiveSeries.Inc() - } - - series = newSeries - } - - series.Lock() - defer series.Unlock() - - if t <= a.minValidTime(series.lastTs) { - a.metrics.totalOutOfOrderSamples.Inc() - return 0, storage.ErrOutOfOrderSample - } + a.histogramSeries = append(a.histogramSeries, s) + default: + isStale = value.IsStaleNaN(v) - if st <= series.lastTs { - // discard the sample if it's out of order. - return 0, storage.ErrOutOfOrderST + // NOTE: always modify pendingSamples and sampleSeries together. + a.pendingSamples = append(a.pendingSamples, record.RefSample{ + Ref: s.ref, + T: t, + V: v, + }) + a.sampleSeries = append(a.sampleSeries, s) } - series.lastTs = st - - // NOTE: always modify pendingSamples and sampleSeries together. - a.pendingSamples = append(a.pendingSamples, record.RefSample{ - Ref: series.ref, - T: st, - V: 0, - }) - a.sampleSeries = append(a.sampleSeries, series) - - a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() - - return storage.SeriesRef(series.ref), nil -} - -// Commit submits the collected samples and purges the batch. -func (a *appender) Commit() error { - if err := a.log(); err != nil { - return err + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricType).Inc() + if isStale { + // For stale values we never attempt to process metadata/exemplars, claim the success. + return storage.SeriesRef(s.ref), nil } - a.clearData() - a.appenderPool.Put(a) - - if a.writeNotified != nil { - a.writeNotified.Notify() + // Append exemplars if any and if storage was configured for it. + // TODO(bwplotka): Agent does not have equivalent of a.head.opts.EnableExemplarStorage && a.head.opts.MaxExemplars.Load() > 0 ? + if len(opts.Exemplars) > 0 { + // Currently only exemplars can return partial errors. + partialErr = a.appendExemplars(s, opts.Exemplars) } - return nil + return storage.SeriesRef(s.ref), partialErr } -// log logs all pending data to the WAL. -func (a *appender) log() error { - a.mtx.RLock() - defer a.mtx.RUnlock() - - var encoder record.Encoder - buf := a.bufPool.Get().([]byte) - defer func() { - a.bufPool.Put(buf) //nolint:staticcheck - }() - - if len(a.pendingSeries) > 0 { - buf = encoder.Series(a.pendingSeries, buf) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } - - if len(a.pendingSamples) > 0 { - buf = encoder.Samples(a.pendingSamples, buf) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } - - if len(a.pendingHistograms) > 0 { - var customBucketsHistograms []record.RefHistogramSample - buf, customBucketsHistograms = encoder.HistogramSamples(a.pendingHistograms, buf) - if len(buf) > 0 { - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } - if len(customBucketsHistograms) > 0 { - buf = encoder.CustomBucketsHistogramSamples(customBucketsHistograms, nil) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] - } - } +func (a *appenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemplar) error { + var errs []error + for _, e := range exemplar { + // Ensure no empty labels have gotten through. + e.Labels = e.Labels.WithoutEmpty() - if len(a.pendingFloatHistograms) > 0 { - var customBucketsFloatHistograms []record.RefFloatHistogramSample - buf, customBucketsFloatHistograms = encoder.FloatHistogramSamples(a.pendingFloatHistograms, buf) - if len(buf) > 0 { - if err := a.wal.Log(buf); err != nil { - return err + if err := a.validateExemplar(s.ref, e); err != nil { + if !errors.Is(err, storage.ErrDuplicateExemplar) { + // Except duplicates, return partial errors. + errs = append(errs, err) + continue } - buf = buf[:0] - } - if len(customBucketsFloatHistograms) > 0 { - buf = encoder.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, nil) - if err := a.wal.Log(buf); err != nil { - return err + if !errors.Is(err, storage.ErrOutOfOrderExemplar) { + a.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e) } - buf = buf[:0] - } - } - - if len(a.pendingExamplars) > 0 { - buf = encoder.Exemplars(a.pendingExamplars, buf) - if err := a.wal.Log(buf); err != nil { - return err + continue } - buf = buf[:0] - } - var series *memSeries - for i, s := range a.pendingSamples { - series = a.sampleSeries[i] - if !series.updateTimestamp(s.T) { - a.metrics.totalOutOfOrderSamples.Inc() - } - } - for i, s := range a.pendingHistograms { - series = a.histogramSeries[i] - if !series.updateTimestamp(s.T) { - a.metrics.totalOutOfOrderSamples.Inc() - } + a.series.SetLatestExemplar(s.ref, &e) + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ + Ref: s.ref, + T: e.Ts, + V: e.Value, + Labels: e.Labels, + }) + a.metrics.totalAppendedExemplars.Inc() } - for i, s := range a.pendingFloatHistograms { - series = a.floatHistogramSeries[i] - if !series.updateTimestamp(s.T) { - a.metrics.totalOutOfOrderSamples.Inc() - } + if len(errs) > 0 { + return &storage.AppendPartialError{ExemplarErrors: errs} } - return nil } -// clearData clears all pending data. -func (a *appender) clearData() { - a.pendingSeries = a.pendingSeries[:0] - a.pendingSamples = a.pendingSamples[:0] - a.pendingHistograms = a.pendingHistograms[:0] - a.pendingFloatHistograms = a.pendingFloatHistograms[:0] - a.pendingExamplars = a.pendingExamplars[:0] - a.sampleSeries = a.sampleSeries[:0] - a.histogramSeries = a.histogramSeries[:0] - a.floatHistogramSeries = a.floatHistogramSeries[:0] -} - -func (a *appender) Rollback() error { - // Series are created in-memory regardless of rollback. This means we must - // log them to the WAL, otherwise subsequent commits may reference a series - // which was never written to the WAL. - if err := a.logSeries(); err != nil { - return err +// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60 +// is implemented. +// +// ST is an experimental feature, we don't fail the append on errors, just debug log. +func (a *appenderV2) bestEffortAppendSTZeroSample(s *memSeries, lastTS, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) { + if st >= t { + a.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample) + return } - - a.clearData() - a.appenderPool.Put(a) - return nil -} - -// logSeries logs only pending series records to the WAL. -func (a *appender) logSeries() error { - a.mtx.RLock() - defer a.mtx.RUnlock() - - if len(a.pendingSeries) > 0 { - buf := a.bufPool.Get().([]byte) - defer func() { - a.bufPool.Put(buf) //nolint:staticcheck - }() - - var encoder record.Encoder - buf = encoder.Series(a.pendingSeries, buf) - if err := a.wal.Log(buf); err != nil { - return err - } - buf = buf[:0] + if st <= lastTS { + a.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrOutOfOrderST) + return } - return nil -} - -// minValidTime returns the minimum timestamp that a sample can have -// and is needed for preventing underflow. -func (a *appender) minValidTime(lastTs int64) int64 { - if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow { - return math.MinInt64 + switch { + case fh != nil: + zeroFloatHistogram := &histogram.FloatHistogram{ + // The STZeroSample represents a counter reset by definition. + CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + CustomValues: fh.CustomValues, + } + a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: st, FH: zeroFloatHistogram}) + a.floatHistogramSeries = append(a.floatHistogramSeries, s) + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + case h != nil: + zeroHistogram := &histogram.Histogram{ + // The STZeroSample represents a counter reset by definition. + CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + CustomValues: h.CustomValues, + } + a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{Ref: s.ref, T: st, H: zeroHistogram}) + a.histogramSeries = append(a.histogramSeries, s) + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc() + default: + a.pendingSamples = append(a.pendingSamples, record.RefSample{Ref: s.ref, T: st, V: 0}) + a.sampleSeries = append(a.sampleSeries, s) + a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc() } - - return lastTs - a.opts.OutOfOrderTimeWindow } diff --git a/tsdb/agent/db_append_v2_test.go b/tsdb/agent/db_append_v2_test.go index 7409f79ec5..ec92cfa630 100644 --- a/tsdb/agent/db_append_v2_test.go +++ b/tsdb/agent/db_append_v2_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 The Prometheus Authors +// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,17 +15,14 @@ package agent import ( "context" - "errors" "fmt" - "io" "math" "path/filepath" - "strconv" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "github.com/stretchr/testify/require" @@ -43,85 +40,56 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -func TestDB_InvalidSeries(t *testing.T) { +func TestDB_InvalidSeries_AppendV2(t *testing.T) { s := createTestAgentDB(t, nil, DefaultOptions()) defer s.Close() - app := s.Appender(context.Background()) - + app := s.AppenderV2(context.Background()) t.Run("Samples", func(t *testing.T) { - _, err := app.Append(0, labels.Labels{}, 0, 0) + _, err := app.Append(0, labels.Labels{}, 0, 0, 0, nil, nil, storage.AOptions{}) require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") - _, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0) + _, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, nil, nil, storage.AOptions{}) require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") }) t.Run("Histograms", func(t *testing.T) { - _, err := app.AppendHistogram(0, labels.Labels{}, 0, tsdbutil.GenerateTestHistograms(1)[0], nil) + _, err := app.Append(0, labels.Labels{}, 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{}) require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject empty labels") - _, err = app.AppendHistogram(0, labels.FromStrings("a", "1", "a", "2"), 0, tsdbutil.GenerateTestHistograms(1)[0], nil) + _, err = app.Append(0, labels.FromStrings("a", "1", "a", "2"), 0, 0, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{}) require.ErrorIs(t, err, tsdb.ErrInvalidSample, "should reject duplicate labels") }) t.Run("Exemplars", func(t *testing.T) { - sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0) - require.NoError(t, err, "should not reject valid series") - - _, err = app.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{}) - require.EqualError(t, err, "unknown series ref when trying to add exemplar: 0") - e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1", "a", "2")} - _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - require.ErrorIs(t, err, tsdb.ErrInvalidExemplar, "should reject duplicate labels") + _, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{e}, + }) + partErr := &storage.AppendPartialError{} + require.ErrorAs(t, err, &partErr) + require.Len(t, partErr.ExemplarErrors, 1) + require.ErrorIs(t, partErr.ExemplarErrors[0], tsdb.ErrInvalidExemplar, "should reject duplicate labels") e = exemplar.Exemplar{Labels: labels.FromStrings("a_somewhat_long_trace_id", "nYJSNtFrFTY37VR7mHzEE/LIDt7cdAQcuOzFajgmLDAdBSRHYPDzrxhMA4zz7el8naI/AoXFv9/e/G0vcETcIoNUi3OieeLfaIRQci2oa")} - _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - require.ErrorIs(t, err, storage.ErrExemplarLabelLength, "should reject too long label length") + _, err = app.Append(0, labels.FromStrings("a", "2"), 0, 0, 0, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{e}, + }) + partErr = &storage.AppendPartialError{} + require.ErrorAs(t, err, &partErr) + require.Len(t, partErr.ExemplarErrors, 1) + require.ErrorIs(t, partErr.ExemplarErrors[0], storage.ErrExemplarLabelLength, "should reject too long label length") - // Inverse check + // Inverse check. e = exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true} - _, err = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + _, err = app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{e}, + }) require.NoError(t, err, "should not reject valid exemplars") }) } -func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *DB { - t.Helper() - - dbDir := t.TempDir() - rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false) - t.Cleanup(func() { - require.NoError(t, rs.Close()) - }) - - db, err := Open(promslog.NewNopLogger(), reg, rs, dbDir, opts) - require.NoError(t, err) - return db -} - -func TestUnsupportedFunctions(t *testing.T) { - s := createTestAgentDB(t, nil, DefaultOptions()) - defer s.Close() - - t.Run("Querier", func(t *testing.T) { - _, err := s.Querier(0, 0) - require.Equal(t, err, ErrUnsupported) - }) - - t.Run("ChunkQuerier", func(t *testing.T) { - _, err := s.ChunkQuerier(0, 0) - require.Equal(t, err, ErrUnsupported) - }) - - t.Run("ExemplarQuerier", func(t *testing.T) { - _, err := s.ExemplarQuerier(context.TODO()) - require.Equal(t, err, ErrUnsupported) - }) -} - -func TestCommit(t *testing.T) { +func TestCommit_AppendV2(t *testing.T) { const ( numDatapoints = 1000 numHistograms = 100 @@ -129,7 +97,7 @@ func TestCommit(t *testing.T) { ) s := createTestAgentDB(t, nil, DefaultOptions()) - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { @@ -137,16 +105,14 @@ func TestCommit(t *testing.T) { for i := range numDatapoints { sample := chunks.GenerateSamples(0, 1) - ref, err := app.Append(0, lset, sample[0].T(), sample[0].F()) - require.NoError(t, err) - - e := exemplar.Exemplar{ - Labels: lset, - Ts: sample[0].T() + int64(i), - Value: sample[0].F(), - HasTs: true, - } - _, err = app.AppendExemplar(ref, lset, e) + _, err := app.Append(0, lset, 0, sample[0].T(), sample[0].F(), nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{{ + Labels: lset, + Ts: sample[0].T() + int64(i), + Value: sample[0].F(), + HasTs: true, + }}, + }) require.NoError(t, err) } } @@ -158,7 +124,7 @@ func TestCommit(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -170,7 +136,7 @@ func TestCommit(t *testing.T) { customBucketHistograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), customBucketHistograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, customBucketHistograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -182,7 +148,7 @@ func TestCommit(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -194,7 +160,7 @@ func TestCommit(t *testing.T) { customBucketFloatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), nil, customBucketFloatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, customBucketFloatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -260,7 +226,7 @@ func TestCommit(t *testing.T) { require.Equal(t, numSeries*numHistograms*2, walFloatHistogramCount, "unexpected number of float histograms") } -func TestRollback(t *testing.T) { +func TestRollback_AppendV2(t *testing.T) { const ( numDatapoints = 1000 numHistograms = 100 @@ -268,7 +234,7 @@ func TestRollback(t *testing.T) { ) s := createTestAgentDB(t, nil, DefaultOptions()) - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { @@ -276,7 +242,7 @@ func TestRollback(t *testing.T) { for range numDatapoints { sample := chunks.GenerateSamples(0, 1) - _, err := app.Append(0, lset, sample[0].T(), sample[0].F()) + _, err := app.Append(0, lset, 0, sample[0].T(), sample[0].F(), nil, nil, storage.AOptions{}) require.NoError(t, err) } } @@ -288,7 +254,7 @@ func TestRollback(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -300,7 +266,7 @@ func TestRollback(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -312,7 +278,7 @@ func TestRollback(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -324,7 +290,7 @@ func TestRollback(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -393,7 +359,7 @@ func TestRollback(t *testing.T) { require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL") } -func TestFullTruncateWAL(t *testing.T) { +func TestFullTruncateWAL_AppendV2(t *testing.T) { const ( numDatapoints = 1000 numHistograms = 100 @@ -409,14 +375,14 @@ func TestFullTruncateWAL(t *testing.T) { defer func() { require.NoError(t, s.Close()) }() - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { lset := labels.New(l...) for range numDatapoints { - _, err := app.Append(0, lset, int64(lastTs), 0) + _, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -429,7 +395,7 @@ func TestFullTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -442,7 +408,7 @@ func TestFullTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(lastTs), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(lastTs), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -455,7 +421,7 @@ func TestFullTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -468,7 +434,7 @@ func TestFullTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, int64(lastTs), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(lastTs), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -481,7 +447,7 @@ func TestFullTruncateWAL(t *testing.T) { require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") } -func TestPartialTruncateWAL(t *testing.T) { +func TestPartialTruncateWAL_AppendV2(t *testing.T) { const ( numDatapoints = 1000 numSeries = 800 @@ -494,7 +460,7 @@ func TestPartialTruncateWAL(t *testing.T) { defer func() { require.NoError(t, s.Close()) }() - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) // Create first batch of 800 series with 1000 data-points with a fixed lastTs as 500. var lastTs int64 = 500 @@ -503,7 +469,7 @@ func TestPartialTruncateWAL(t *testing.T) { lset := labels.New(l...) for range numDatapoints { - _, err := app.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -516,7 +482,7 @@ func TestPartialTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -529,7 +495,7 @@ func TestPartialTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -542,7 +508,7 @@ func TestPartialTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -555,7 +521,7 @@ func TestPartialTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -568,7 +534,7 @@ func TestPartialTruncateWAL(t *testing.T) { lset := labels.New(l...) for range numDatapoints { - _, err := app.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -581,7 +547,7 @@ func TestPartialTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -594,7 +560,7 @@ func TestPartialTruncateWAL(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -607,7 +573,7 @@ func TestPartialTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -620,7 +586,7 @@ func TestPartialTruncateWAL(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numDatapoints) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -634,7 +600,7 @@ func TestPartialTruncateWAL(t *testing.T) { require.Equal(t, float64(numSeries*5), m.Metric[0].Gauge.GetValue(), "agent wal truncate mismatch of deleted series count") } -func TestWALReplay(t *testing.T) { +func TestWALReplay_AppendV2(t *testing.T) { const ( numDatapoints = 1000 numHistograms = 100 @@ -643,14 +609,14 @@ func TestWALReplay(t *testing.T) { ) s := createTestAgentDB(t, nil, DefaultOptions()) - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) lbls := labelsForTest(t.Name(), numSeries) for _, l := range lbls { lset := labels.New(l...) for range numDatapoints { - _, err := app.Append(0, lset, lastTs, 0) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, nil, storage.AOptions{}) require.NoError(t, err) } } @@ -662,7 +628,7 @@ func TestWALReplay(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -674,7 +640,7 @@ func TestWALReplay(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, lastTs, histograms[i], nil) + _, err := app.Append(0, lset, 0, lastTs, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -686,7 +652,7 @@ func TestWALReplay(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -698,7 +664,7 @@ func TestWALReplay(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numHistograms { - _, err := app.AppendHistogram(0, lset, lastTs, nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, lastTs, 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -733,29 +699,7 @@ func TestWALReplay(t *testing.T) { } } -func TestLockfile(t *testing.T) { - tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) { - logger := promslog.NewNopLogger() - reg := prometheus.NewRegistry() - rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false) - t.Cleanup(func() { - require.NoError(t, rs.Close()) - }) - - opts := DefaultOptions() - opts.NoLockfile = !createLock - - // Create the DB. This should create lockfile and its metrics. - db, err := Open(logger, nil, rs, data, opts) - require.NoError(t, err) - - return db.locker, testutil.NewCallbackCloser(func() { - require.NoError(t, db.Close()) - }) - }) -} - -func Test_ExistingWAL_NextRef(t *testing.T) { +func Test_ExistingWAL_NextRef_AppendV2(t *testing.T) { dbDir := t.TempDir() rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false) defer func() { @@ -768,10 +712,10 @@ func Test_ExistingWAL_NextRef(t *testing.T) { seriesCount := 10 // Append series - app := db.Appender(context.Background()) + app := db.AppenderV2(context.Background()) for i := range seriesCount { lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("series_%d", i)) - _, err := app.Append(0, lset, 0, 100) + _, err := app.Append(0, lset, 0, 0, 100, nil, nil, storage.AOptions{}) require.NoError(t, err) } @@ -780,7 +724,7 @@ func Test_ExistingWAL_NextRef(t *testing.T) { // Append series for i := range histogramCount { lset := labels.FromStrings(model.MetricNameLabel, fmt.Sprintf("histogram_%d", i)) - _, err := app.AppendHistogram(0, lset, 0, histograms[i], nil) + _, err := app.Append(0, lset, 0, 0, 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -799,90 +743,23 @@ func Test_ExistingWAL_NextRef(t *testing.T) { require.Equal(t, uint64(seriesCount+histogramCount), db.nextRef.Load(), "nextRef should be equal to the number of series written across the entire WAL") } -func Test_validateOptions(t *testing.T) { - t.Run("Apply defaults to zero values", func(t *testing.T) { - opts := validateOptions(&Options{}) - require.Equal(t, DefaultOptions(), opts) - }) - - t.Run("Defaults are already valid", func(t *testing.T) { - require.Equal(t, DefaultOptions(), validateOptions(nil)) - }) - - t.Run("MaxWALTime should not be lower than TruncateFrequency", func(t *testing.T) { - opts := validateOptions(&Options{ - MaxWALTime: int64(time.Hour / time.Millisecond), - TruncateFrequency: 2 * time.Hour, - }) - require.Equal(t, int64(2*time.Hour/time.Millisecond), opts.MaxWALTime) - }) -} - -func startTime() (int64, error) { - return time.Now().Unix() * 1000, nil -} - -// Create series for tests. -func labelsForTest(lName string, seriesCount int) [][]labels.Label { - var series [][]labels.Label - - for i := range seriesCount { - lset := []labels.Label{ - {Name: "a", Value: lName}, - {Name: "instance", Value: "localhost" + strconv.Itoa(i)}, - {Name: "job", Value: "prometheus"}, - } - series = append(series, lset) - } - - return series -} - -func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto.MetricFamily { - t.Helper() - - families, err := reg.Gather() - require.NoError(t, err, "failed to gather metrics") - - for _, f := range families { - if f.GetName() == familyName { - return f - } - } - - t.Fatalf("could not find family %s", familyName) - - return nil -} - -func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { +func TestStorage_DuplicateExemplarsIgnored_AppendV2(t *testing.T) { s := createTestAgentDB(t, nil, DefaultOptions()) - app := s.Appender(context.Background()) + app := s.AppenderV2(context.Background()) defer s.Close() - sRef, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0) - require.NoError(t, err, "should not reject valid series") - // Write a few exemplars to our appender and call Commit(). // If the Labels, Value or Timestamp are different than the last exemplar, // then a new one should be appended; Otherwise, it should be skipped. - e := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true} - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - - e.Labels = labels.FromStrings("b", "2") - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - - e.Value = 42 - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - - e.Ts = 25 - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) - _, _ = app.AppendExemplar(sRef, labels.EmptyLabels(), e) + e1 := exemplar.Exemplar{Labels: labels.FromStrings("a", "1"), Value: 20, Ts: 10, HasTs: true} + e2 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 20, Ts: 10, HasTs: true} + e3 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 10, HasTs: true} + e4 := exemplar.Exemplar{Labels: labels.FromStrings("b", "2"), Value: 42, Ts: 25, HasTs: true} + _, err := app.Append(0, labels.FromStrings("a", "1"), 0, 0, 0, nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{e1, e1, e2, e2, e2, e3, e3, e4, e4}, + }) + require.NoError(t, err, "should not reject valid series") require.NoError(t, app.Commit()) // Read back what was written to the WAL. @@ -907,7 +784,7 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { require.Equal(t, 4, walExemplarsCount) } -func TestDBAllowOOOSamples(t *testing.T) { +func TestDBAllowOOOSamples_AppendV2(t *testing.T) { const ( numDatapoints = 5 numHistograms = 5 @@ -919,7 +796,7 @@ func TestDBAllowOOOSamples(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = math.MaxInt64 s := createTestAgentDB(t, reg, opts) - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) // Let's add some samples in the [offset, offset+numDatapoints) range. lbls := labelsForTest(t.Name(), numSeries) @@ -927,16 +804,14 @@ func TestDBAllowOOOSamples(t *testing.T) { lset := labels.New(l...) for i := offset; i < numDatapoints+offset; i++ { - ref, err := app.Append(0, lset, int64(i), float64(i)) - require.NoError(t, err) - - e := exemplar.Exemplar{ - Labels: lset, - Ts: int64(i) * 2, - Value: float64(i), - HasTs: true, - } - _, err = app.AppendExemplar(ref, lset, e) + _, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{{ + Labels: lset, + Ts: int64(i) * 2, + Value: float64(i), + HasTs: true, + }}, + }) require.NoError(t, err) } } @@ -948,7 +823,7 @@ func TestDBAllowOOOSamples(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := offset; i < numDatapoints+offset; i++ { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -960,7 +835,7 @@ func TestDBAllowOOOSamples(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := offset; i < numDatapoints+offset; i++ { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i-offset], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -972,7 +847,7 @@ func TestDBAllowOOOSamples(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := offset; i < numDatapoints+offset; i++ { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{}) require.NoError(t, err) } } @@ -984,7 +859,7 @@ func TestDBAllowOOOSamples(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := offset; i < numDatapoints+offset; i++ { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i-offset], storage.AOptions{}) require.NoError(t, err) } } @@ -1006,7 +881,7 @@ func TestDBAllowOOOSamples(t *testing.T) { t.Fatalf("unable to create storage for the agent: %v", err) } - app = db.Appender(context.Background()) + app = db.AppenderV2(context.Background()) // Now the lastTs will have been recorded successfully. // Let's try appending twice as many OOO samples in the [0, numDatapoints) range. @@ -1015,16 +890,14 @@ func TestDBAllowOOOSamples(t *testing.T) { lset := labels.New(l...) for i := range numDatapoints { - ref, err := app.Append(0, lset, int64(i), float64(i)) - require.NoError(t, err) - - e := exemplar.Exemplar{ - Labels: lset, - Ts: int64(i) * 2, - Value: float64(i), - HasTs: true, - } - _, err = app.AppendExemplar(ref, lset, e) + _, err := app.Append(0, lset, 0, int64(i), float64(i), nil, nil, storage.AOptions{ + Exemplars: []exemplar.Exemplar{{ + Labels: lset, + Ts: int64(i) * 2, + Value: float64(i), + HasTs: true, + }}, + }) require.NoError(t, err) } } @@ -1036,7 +909,7 @@ func TestDBAllowOOOSamples(t *testing.T) { histograms := tsdbutil.GenerateTestHistograms(numHistograms) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -1048,7 +921,7 @@ func TestDBAllowOOOSamples(t *testing.T) { histograms := tsdbutil.GenerateTestCustomBucketsHistograms(numHistograms) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil) + _, err := app.Append(0, lset, 0, int64(i), 0, histograms[i], nil, storage.AOptions{}) require.NoError(t, err) } } @@ -1060,7 +933,7 @@ func TestDBAllowOOOSamples(t *testing.T) { floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -1072,7 +945,7 @@ func TestDBAllowOOOSamples(t *testing.T) { floatHistograms := tsdbutil.GenerateTestCustomBucketsFloatHistograms(numHistograms) for i := range numDatapoints { - _, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i]) + _, err := app.Append(0, lset, 0, int64(i), 0, nil, floatHistograms[i], storage.AOptions{}) require.NoError(t, err) } } @@ -1084,7 +957,7 @@ func TestDBAllowOOOSamples(t *testing.T) { require.NoError(t, db.Close()) } -func TestDBOutOfOrderTimeWindow(t *testing.T) { +func TestDBOutOfOrderTimeWindow_AppendV2(t *testing.T) { tc := []struct { outOfOrderTimeWindow, firstTs, secondTs int64 expectedError error @@ -1102,24 +975,24 @@ func TestDBOutOfOrderTimeWindow(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = c.outOfOrderTimeWindow s := createTestAgentDB(t, reg, opts) - app := s.Appender(context.TODO()) + app := s.AppenderV2(context.TODO()) lbls := labelsForTest(t.Name()+"_histogram", 1) lset := labels.New(lbls[0]...) - _, err := app.AppendHistogram(0, lset, c.firstTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + _, err := app.Append(0, lset, 0, c.firstTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{}) require.NoError(t, err) err = app.Commit() require.NoError(t, err) - _, err = app.AppendHistogram(0, lset, c.secondTs, tsdbutil.GenerateTestHistograms(1)[0], nil) + _, err = app.Append(0, lset, 0, c.secondTs, 0, tsdbutil.GenerateTestHistograms(1)[0], nil, storage.AOptions{}) require.ErrorIs(t, err, c.expectedError) lbls = labelsForTest(t.Name(), 1) lset = labels.New(lbls[0]...) - _, err = app.Append(0, lset, c.firstTs, 0) + _, err = app.Append(0, lset, 0, c.firstTs, 0, nil, nil, storage.AOptions{}) require.NoError(t, err) err = app.Commit() require.NoError(t, err) - _, err = app.Append(0, lset, c.secondTs, 0) + _, err = app.Append(0, lset, 0, c.secondTs, 0, nil, nil, storage.AOptions{}) require.ErrorIs(t, err, c.expectedError) expectedAppendedSamples := float64(2) @@ -1134,28 +1007,26 @@ func TestDBOutOfOrderTimeWindow(t *testing.T) { } } -type walSample struct { - t int64 - f float64 - h *histogram.Histogram - lbls labels.Labels - ref storage.SeriesRef -} - -func TestDBStartTimestampSamplesIngestion(t *testing.T) { +func TestDB_EnableSTZeroInjection_AppendV2(t *testing.T) { t.Parallel() + // NOTE: Eventually wal sample and appendable sample should be the same. type appendableSample struct { - t int64 - st int64 - v float64 - lbls labels.Labels - h *histogram.Histogram - expectsError bool + st, t int64 + v float64 + lbls labels.Labels + h *histogram.Histogram } - testHistogram := tsdbutil.GenerateTestHistograms(1)[0] - zeroHistogram := &histogram.Histogram{} + testHistograms := tsdbutil.GenerateTestHistograms(2) + zeroHistogram := &histogram.Histogram{ + // The STZeroSample represents a counter reset by definition. + CounterResetHint: histogram.CounterReset, + // Replicate other fields to avoid needless chunk creation. + Schema: testHistograms[0].Schema, + ZeroThreshold: testHistograms[0].ZeroThreshold, + CustomValues: testHistograms[0].CustomValues, + } lbls := labelsForTest(t.Name(), 1) defLbls := labels.New(lbls[0]...) @@ -1163,7 +1034,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { testCases := []struct { name string inputSamples []appendableSample - expectedSamples []*walSample + expectedSamples []walSample expectedSeriesCount int }{ { @@ -1172,10 +1043,10 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { {t: 100, st: 1, v: 10, lbls: defLbls}, {t: 101, st: 1, v: 10, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 1, f: 0, lbls: defLbls}, - {t: 100, f: 10, lbls: defLbls}, - {t: 101, f: 10, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 1, f: 0, lbls: defLbls, ref: 1}, + {t: 100, f: 10, lbls: defLbls, ref: 1}, + {t: 101, f: 10, lbls: defLbls, ref: 1}, }, }, { @@ -1190,54 +1061,52 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { { t: 300, st: 230, - h: testHistogram, + h: testHistograms[0], lbls: defLbls, }, }, - expectedSamples: []*walSample{ - {t: 30, f: 0, lbls: defLbls}, - {t: 100, f: 20, lbls: defLbls}, - {t: 230, h: zeroHistogram, lbls: defLbls}, - {t: 300, h: testHistogram, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 30, f: 0, lbls: defLbls, ref: 1}, + {t: 100, f: 20, lbls: defLbls, ref: 1}, + {t: 230, h: zeroHistogram, lbls: defLbls, ref: 1}, + {t: 300, h: testHistograms[0], lbls: defLbls, ref: 1}, }, expectedSeriesCount: 1, }, { - name: "ST+float && ST+histogram samples with error", + name: "ST+float && ST+histogram samples with error; should be ignored", inputSamples: []appendableSample{ { // invalid ST - t: 100, - st: 100, - v: 10, - lbls: defLbls, - expectsError: true, + t: 100, + st: 100, + v: 10, + lbls: defLbls, }, { // invalid ST histogram - t: 300, - st: 300, - h: testHistogram, - lbls: defLbls, - expectsError: true, + t: 300, + st: 300, + h: testHistograms[0], + lbls: defLbls, }, }, - expectedSamples: []*walSample{ - {t: 100, f: 10, lbls: defLbls}, - {t: 300, h: testHistogram, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 100, f: 10, lbls: defLbls, ref: 1}, + {t: 300, h: testHistograms[0], lbls: defLbls, ref: 1}, }, expectedSeriesCount: 0, }, { name: "In order ct+normal sample/histogram", inputSamples: []appendableSample{ - {t: 100, h: testHistogram, st: 1, lbls: defLbls}, - {t: 101, h: testHistogram, st: 1, lbls: defLbls}, + {t: 100, h: testHistograms[0], st: 1, lbls: defLbls}, + {t: 101, h: testHistograms[1], st: 1, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 1, h: &histogram.Histogram{}}, - {t: 100, h: testHistogram}, - {t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}}, + expectedSamples: []walSample{ + {t: 1, h: zeroHistogram, lbls: defLbls, ref: 1}, + {t: 100, h: testHistograms[0], lbls: defLbls, ref: 1}, + {t: 101, h: testHistograms[1], lbls: defLbls, ref: 1}, }, }, { @@ -1248,12 +1117,12 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { {t: 180_000, st: 40_000, v: 10, lbls: defLbls}, {t: 50_000, st: 40_000, v: 10, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 40_000, f: 0, lbls: defLbls}, - {t: 50_000, f: 10, lbls: defLbls}, - {t: 60_000, f: 10, lbls: defLbls}, - {t: 120_000, f: 10, lbls: defLbls}, - {t: 180_000, f: 10, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 40_000, f: 0, lbls: defLbls, ref: 1}, + {t: 60_000, f: 10, lbls: defLbls, ref: 1}, + {t: 120_000, f: 10, lbls: defLbls, ref: 1}, + {t: 180_000, f: 10, lbls: defLbls, ref: 1}, + {t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample. }, }, } @@ -1265,36 +1134,21 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { reg := prometheus.NewRegistry() opts := DefaultOptions() opts.OutOfOrderTimeWindow = 360_000 + opts.EnableSTAsZeroSample = true s := createTestAgentDB(t, reg, opts) - app := s.Appender(context.TODO()) for _, sample := range tc.inputSamples { - // We supposed to write a Histogram to the WAL - if sample.h != nil { - _, err := app.AppendHistogramSTZeroSample(0, sample.lbls, sample.t, sample.st, zeroHistogram, nil) - if !errors.Is(err, storage.ErrOutOfOrderST) { - require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) - } - - _, err = app.AppendHistogram(0, sample.lbls, sample.t, sample.h, nil) - require.NoError(t, err) - } else { - // We supposed to write a float sample to the WAL - _, err := app.AppendSTZeroSample(0, sample.lbls, sample.t, sample.st) - if !errors.Is(err, storage.ErrOutOfOrderST) { - require.Equal(t, sample.expectsError, err != nil, "expected error: %v, got: %v", sample.expectsError, err) - } - - _, err = app.Append(0, sample.lbls, sample.t, sample.v) - require.NoError(t, err) - } + // Simulate one sample per series logic we have in all our ingestion paths in Prometheus. + app := s.AppenderV2(t.Context()) + _, err := app.Append(0, sample.lbls, sample.st, sample.t, sample.v, sample.h, nil, storage.AOptions{}) + require.NoError(t, err) + require.NoError(t, app.Commit()) } - require.NoError(t, app.Commit()) // Close the DB to ensure all data is flushed to the WAL require.NoError(t, s.Close()) - // Check that we dont have any OOO samples in the WAL by checking metrics + // Check that we don't have any OOO samples in the WAL by checking metrics families, err := reg.Gather() require.NoError(t, err, "failed to gather metrics") for _, f := range families { @@ -1303,94 +1157,8 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { } } - outputSamples := readWALSamples(t, s.wal.Dir()) - - require.Len(t, outputSamples, len(tc.expectedSamples), "Expected %d samples", len(tc.expectedSamples)) - - for i, expectedSample := range tc.expectedSamples { - for _, sample := range outputSamples { - if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() { - if expectedSample.h != nil { - require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i) - } else { - require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i) - } - } - } - } + got := readWALSamples(t, s.wal.Dir()) + testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})}) }) } } - -func readWALSamples(t *testing.T, walDir string) []*walSample { - t.Helper() - sr, err := wlog.NewSegmentsReader(walDir) - require.NoError(t, err) - defer func(sr io.ReadCloser) { - err := sr.Close() - require.NoError(t, err) - }(sr) - - r := wlog.NewReader(sr) - dec := record.NewDecoder(labels.NewSymbolTable(), promslog.NewNopLogger()) - - var ( - samples []record.RefSample - histograms []record.RefHistogramSample - - lastSeries record.RefSeries - outputSamples = make([]*walSample, 0) - ) - - for r.Next() { - rec := r.Record() - switch dec.Type(rec) { - case record.Series: - series, err := dec.Series(rec, nil) - require.NoError(t, err) - lastSeries = series[0] - case record.Samples: - samples, err = dec.Samples(rec, samples[:0]) - require.NoError(t, err) - for _, s := range samples { - outputSamples = append(outputSamples, &walSample{ - t: s.T, - f: s.V, - lbls: lastSeries.Labels.Copy(), - ref: storage.SeriesRef(lastSeries.Ref), - }) - } - case record.HistogramSamples: - histograms, err = dec.HistogramSamples(rec, histograms[:0]) - require.NoError(t, err) - for _, h := range histograms { - outputSamples = append(outputSamples, &walSample{ - t: h.T, - h: h.H, - lbls: lastSeries.Labels.Copy(), - ref: storage.SeriesRef(lastSeries.Ref), - }) - } - } - } - - return outputSamples -} - -func BenchmarkCreateSeries(b *testing.B) { - s := createTestAgentDB(b, nil, DefaultOptions()) - defer s.Close() - - app := s.Appender(context.Background()).(*appender) - lbls := make([]labels.Labels, b.N) - - for i, l := range labelsForTest("benchmark", b.N) { - lbls[i] = labels.New(l...) - } - - b.ResetTimer() - - for _, l := range lbls { - app.getOrCreate(l) - } -} diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 7409f79ec5..94e84fa2eb 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" @@ -1142,6 +1143,10 @@ type walSample struct { ref storage.SeriesRef } +// NOTE(bwplotka): This test is testing behaviour of storage.Appender interface against its invariants (see +// storage.Appender comment) around validation of the order of samples within a single Appender. This results +// in a slight bug in AppendSTZero* methods. We are leaving it as-is given the planned removal of AppenderV1 as +// per https://github.com/prometheus/prometheus/issues/17632. func TestDBStartTimestampSamplesIngestion(t *testing.T) { t.Parallel() @@ -1154,7 +1159,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { expectsError bool } - testHistogram := tsdbutil.GenerateTestHistograms(1)[0] + testHistograms := tsdbutil.GenerateTestHistograms(2) zeroHistogram := &histogram.Histogram{} lbls := labelsForTest(t.Name(), 1) @@ -1163,7 +1168,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { testCases := []struct { name string inputSamples []appendableSample - expectedSamples []*walSample + expectedSamples []walSample expectedSeriesCount int }{ { @@ -1172,10 +1177,10 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { {t: 100, st: 1, v: 10, lbls: defLbls}, {t: 101, st: 1, v: 10, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 1, f: 0, lbls: defLbls}, - {t: 100, f: 10, lbls: defLbls}, - {t: 101, f: 10, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 1, f: 0, lbls: defLbls, ref: 1}, + {t: 100, f: 10, lbls: defLbls, ref: 1}, + {t: 101, f: 10, lbls: defLbls, ref: 1}, }, }, { @@ -1190,15 +1195,15 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { { t: 300, st: 230, - h: testHistogram, + h: testHistograms[0], lbls: defLbls, }, }, - expectedSamples: []*walSample{ - {t: 30, f: 0, lbls: defLbls}, - {t: 100, f: 20, lbls: defLbls}, - {t: 230, h: zeroHistogram, lbls: defLbls}, - {t: 300, h: testHistogram, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 30, f: 0, lbls: defLbls, ref: 1}, + {t: 100, f: 20, lbls: defLbls, ref: 1}, + {t: 230, h: zeroHistogram, lbls: defLbls, ref: 1}, + {t: 300, h: testHistograms[0], lbls: defLbls, ref: 1}, }, expectedSeriesCount: 1, }, @@ -1217,27 +1222,27 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { // invalid ST histogram t: 300, st: 300, - h: testHistogram, + h: testHistograms[0], lbls: defLbls, expectsError: true, }, }, - expectedSamples: []*walSample{ - {t: 100, f: 10, lbls: defLbls}, - {t: 300, h: testHistogram, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 100, f: 10, lbls: defLbls, ref: 1}, + {t: 300, h: testHistograms[0], lbls: defLbls, ref: 1}, }, expectedSeriesCount: 0, }, { name: "In order ct+normal sample/histogram", inputSamples: []appendableSample{ - {t: 100, h: testHistogram, st: 1, lbls: defLbls}, - {t: 101, h: testHistogram, st: 1, lbls: defLbls}, + {t: 100, h: testHistograms[0], st: 1, lbls: defLbls}, + {t: 101, h: testHistograms[1], st: 1, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 1, h: &histogram.Histogram{}}, - {t: 100, h: testHistogram}, - {t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}}, + expectedSamples: []walSample{ + {t: 1, h: &histogram.Histogram{}, lbls: defLbls, ref: 1}, + {t: 100, h: testHistograms[0], lbls: defLbls, ref: 1}, + {t: 101, h: testHistograms[1], lbls: defLbls, ref: 1}, }, }, { @@ -1248,12 +1253,12 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { {t: 180_000, st: 40_000, v: 10, lbls: defLbls}, {t: 50_000, st: 40_000, v: 10, lbls: defLbls}, }, - expectedSamples: []*walSample{ - {t: 40_000, f: 0, lbls: defLbls}, - {t: 50_000, f: 10, lbls: defLbls}, - {t: 60_000, f: 10, lbls: defLbls}, - {t: 120_000, f: 10, lbls: defLbls}, - {t: 180_000, f: 10, lbls: defLbls}, + expectedSamples: []walSample{ + {t: 40_000, f: 0, lbls: defLbls, ref: 1}, + {t: 60_000, f: 10, lbls: defLbls, ref: 1}, + {t: 120_000, f: 10, lbls: defLbls, ref: 1}, + {t: 180_000, f: 10, lbls: defLbls, ref: 1}, + {t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample. }, }, } @@ -1294,7 +1299,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { // Close the DB to ensure all data is flushed to the WAL require.NoError(t, s.Close()) - // Check that we dont have any OOO samples in the WAL by checking metrics + // Check that we don't have any OOO samples in the WAL by checking metrics families, err := reg.Gather() require.NoError(t, err, "failed to gather metrics") for _, f := range families { @@ -1303,26 +1308,13 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) { } } - outputSamples := readWALSamples(t, s.wal.Dir()) - - require.Len(t, outputSamples, len(tc.expectedSamples), "Expected %d samples", len(tc.expectedSamples)) - - for i, expectedSample := range tc.expectedSamples { - for _, sample := range outputSamples { - if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() { - if expectedSample.h != nil { - require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i) - } else { - require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i) - } - } - } - } + got := readWALSamples(t, s.wal.Dir()) + testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})}) }) } } -func readWALSamples(t *testing.T, walDir string) []*walSample { +func readWALSamples(t *testing.T, walDir string) []walSample { t.Helper() sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) @@ -1339,7 +1331,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample { histograms []record.RefHistogramSample lastSeries record.RefSeries - outputSamples = make([]*walSample, 0) + outputSamples = make([]walSample, 0) ) for r.Next() { @@ -1353,7 +1345,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample { samples, err = dec.Samples(rec, samples[:0]) require.NoError(t, err) for _, s := range samples { - outputSamples = append(outputSamples, &walSample{ + outputSamples = append(outputSamples, walSample{ t: s.T, f: s.V, lbls: lastSeries.Labels.Copy(), @@ -1364,7 +1356,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample { histograms, err = dec.HistogramSamples(rec, histograms[:0]) require.NoError(t, err) for _, h := range histograms { - outputSamples = append(outputSamples, &walSample{ + outputSamples = append(outputSamples, walSample{ t: h.T, h: h.H, lbls: lastSeries.Labels.Copy(), @@ -1373,14 +1365,14 @@ func readWALSamples(t *testing.T, walDir string) []*walSample { } } } - return outputSamples } -func BenchmarkCreateSeries(b *testing.B) { +func BenchmarkGetOrCreate(b *testing.B) { s := createTestAgentDB(b, nil, DefaultOptions()) defer s.Close() + // NOTE: This benchmarks appenderBase, so it does not matter if it's V1 or V2. app := s.Appender(context.Background()).(*appender) lbls := make([]labels.Labels, b.N) diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index c5ed9898e9..95118528eb 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -167,11 +167,11 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i // an optimization for the more likely case. switch a.typesInBatch[s.ref] { case stHistogram, stCustomBucketHistogram: - return a.Append(ref, ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{ + return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{ RejectOutOfOrder: opts.RejectOutOfOrder, }) case stFloatHistogram, stCustomBucketFloatHistogram: - return a.Append(ref, ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{ + return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{ RejectOutOfOrder: opts.RejectOutOfOrder, }) } @@ -202,7 +202,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i if isStale { // For stale values we never attempt to process metadata/exemplars, claim the success. - return ref, nil + return storage.SeriesRef(s.ref), nil } // Append exemplars if any and if storage was configured for it. @@ -324,6 +324,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) { // Except duplicates, return partial errors. errs = append(errs, err) + continue } if !errors.Is(err, storage.ErrOutOfOrderExemplar) { a.head.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)