diff --git a/promql/analyzer.go b/promql/analyzer.go index 3bc46bb8f0..90ff95207c 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -41,14 +41,20 @@ type Analyzer struct { // fingerprints. One of these structs is collected for each offset by the query // analyzer. type preloadTimes struct { - // Instants require single samples to be loaded along the entire query - // range, with intervals between the samples corresponding to the query - // resolution. - instants map[model.Fingerprint]struct{} - // Ranges require loading a range of samples at each resolution step, - // stretching backwards from the current evaluation timestamp. The length of - // the range into the past is given by the duration, as in "foo[5m]". + // Ranges require loading a range of samples. They can be triggered by + // two type of expressions: First a range expression AKA matrix + // selector, where the Duration in the ranges map is the length of the + // range in the range expression. Second an instant expression AKA + // vector selector, where the Duration in the ranges map is the + // StalenessDelta. In preloading, both types of expressions result in + // the same effect: Preload everything between the specified start time + // minus the Duration in the ranges map up to the specified end time. ranges map[model.Fingerprint]time.Duration + // Instants require a single sample to be loaded. This only happens for + // instant expressions AKA vector selectors iff the specified start ond + // end time are the same, Thus, instants is only populated if start and + // end time are the same. + instants map[model.Fingerprint]struct{} } // Analyze the provided expression and attach metrics and fingerprints to data-selecting @@ -57,13 +63,15 @@ func (a *Analyzer) Analyze(ctx context.Context) error { a.offsetPreloadTimes = map[time.Duration]preloadTimes{} getPreloadTimes := func(offset time.Duration) preloadTimes { - if _, ok := a.offsetPreloadTimes[offset]; !ok { - a.offsetPreloadTimes[offset] = preloadTimes{ - instants: map[model.Fingerprint]struct{}{}, - ranges: map[model.Fingerprint]time.Duration{}, - } + if pt, ok := a.offsetPreloadTimes[offset]; ok { + return pt } - return a.offsetPreloadTimes[offset] + pt := preloadTimes{ + instants: map[model.Fingerprint]struct{}{}, + ranges: map[model.Fingerprint]time.Duration{}, + } + a.offsetPreloadTimes[offset] = pt + return pt } // Retrieve fingerprints and metrics for the required time range for @@ -76,11 +84,14 @@ func (a *Analyzer) Analyze(ctx context.Context) error { pt := getPreloadTimes(n.Offset) for fp := range n.metrics { - // Only add the fingerprint to the instants if not yet present in the - // ranges. Ranges always contain more points and span more time than - // instants for the same offset. - if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges { + r, alreadyInRanges := pt.ranges[fp] + if a.Start.Equal(a.End) && !alreadyInRanges { + // A true instant, we only need one value. pt.instants[fp] = struct{}{} + continue + } + if r < StalenessDelta { + pt.ranges[fp] = StalenessDelta } } case *MatrixSelector: @@ -133,18 +144,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - startOfRange := start.Add(-rangeDuration) - if StalenessDelta > rangeDuration { - // Cover a weird corner case: The expression - // mixes up instants and ranges for the same - // series. We'll handle that over-all as - // range. But if the rangeDuration is smaller - // than the StalenessDelta, the range wouldn't - // cover everything potentially needed for the - // instant, so we have to extend startOfRange. - startOfRange = start.Add(-StalenessDelta) - } - iter, err := p.PreloadRange(fp, startOfRange, end) + iter, err := p.PreloadRange(fp, start.Add(-rangeDuration), end) if err != nil { return nil, err } @@ -154,10 +154,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - // Need to look backwards by StalenessDelta but not - // forward because we always return the closest sample - // _before_ the reference time. - iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end) + iter, err := p.PreloadInstant(fp, start, StalenessDelta) if err != nil { return nil, err } diff --git a/storage/local/interface.go b/storage/local/interface.go index 4007215708..631525b4ad 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -14,6 +14,8 @@ package local import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -92,6 +94,10 @@ type Preloader interface { fp model.Fingerprint, from model.Time, through model.Time, ) (SeriesIterator, error) + PreloadInstant( + fp model.Fingerprint, + timestamp model.Time, stalenessDelta time.Duration, + ) (SeriesIterator, error) // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/preload.go b/storage/local/preload.go index cda04a864b..f51ec6577b 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -13,7 +13,11 @@ package local -import "github.com/prometheus/common/model" +import ( + "time" + + "github.com/prometheus/common/model" +) // memorySeriesPreloader is a Preloader for the memorySeriesStorage. type memorySeriesPreloader struct { @@ -26,7 +30,20 @@ func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, ) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) + cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false) + if err != nil { + return iter, err + } + p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) + return iter, nil +} + +// PreloadInstant implements Preloader +func (p *memorySeriesPreloader) PreloadInstant( + fp model.Fingerprint, + timestamp model.Time, stalenessDelta time.Duration, +) (SeriesIterator, error) { + cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true) if err != nil { return iter, err } diff --git a/storage/local/series.go b/storage/local/series.go index f4d4c6d1a2..e6a37ad4a5 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -367,8 +367,9 @@ func (s *memorySeries) preloadChunks( } // newIterator returns a new SeriesIterator for the provided chunkDescs (which -// must be pinned). The caller must have locked the fingerprint of the -// memorySeries. +// must be pinned). +// +// The caller must have locked the fingerprint of the memorySeries. func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { chunks := make([]chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { @@ -385,9 +386,27 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( + fp model.Fingerprint, from model.Time, through model.Time, - fp model.Fingerprint, mss *memorySeriesStorage, + lastSampleOnly bool, + mss *memorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { + // If we have to preload for only one sample, and we have a + // lastSamplePair in the series, and thas last samplePair is in the + // interval, just take it in a singleSampleSeriesIterator. No need to + // pin or load anything. + if lastSampleOnly { + lastSample := s.lastSamplePair() + if !through.Before(lastSample.Timestamp) && + !from.After(lastSample.Timestamp) && + lastSample != ZeroSamplePair { + iter := &boundedIterator{ + it: &singleSampleSeriesIterator{samplePair: lastSample}, + start: model.Now().Add(-mss.dropAfter), + } + return nil, iter, nil + } + } firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() @@ -619,6 +638,35 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { return chunkIt } +// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut +// iterator" that returns a single samplee only. The sample is saved in the +// iterator itself, so no chunks need to be pinned. +type singleSampleSeriesIterator struct { + samplePair model.SamplePair +} + +// ValueAtTime implements SeriesIterator. +func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { + if it.samplePair.Timestamp.After(t) { + return ZeroSamplePair + } + return it.samplePair +} + +// BoundaryValues implements SeriesIterator. +func (it *singleSampleSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { + return it.RangeValues(in) +} + +// RangeValues implements SeriesIterator. +func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { + if it.samplePair.Timestamp.After(in.NewestInclusive) || + it.samplePair.Timestamp.Before(in.OldestInclusive) { + return []model.SamplePair{} + } + return []model.SamplePair{it.samplePair} +} + // nopSeriesIterator implements Series Iterator. It never returns any values. type nopSeriesIterator struct{} diff --git a/storage/local/storage.go b/storage/local/storage.go index 7d2fcfec8f..e14c3ea5e1 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -689,6 +689,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, + lastSampleOnly bool, ) ([]*chunkDesc, SeriesIterator, error) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -713,7 +714,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( return nil, nopIter, nil } } - return series.preloadChunksForRange(from, through, fp, s) + return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s) } func (s *memorySeriesStorage) handleEvictList() { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 4c7034f646..8fcdcecfea 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -500,7 +500,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -508,7 +508,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -533,7 +533,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -541,7 +541,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -670,7 +670,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -747,7 +747,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { b.Fatalf("Error preloading everything: %s", err) } @@ -828,7 +828,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -983,7 +983,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { b.Fatalf("Error preloading everything: %s", err) } @@ -1032,7 +1032,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) } @@ -1053,7 +1053,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false) if err != nil { t.Fatalf("Error preloading everything: %s", err) }