|
|
|
|
@ -110,7 +110,9 @@ func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) { |
|
|
|
|
func readTestWAL(t testing.TB, dir string) (recs []interface{}) { |
|
|
|
|
sr, err := wlog.NewSegmentsReader(dir) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
defer sr.Close() |
|
|
|
|
defer func() { |
|
|
|
|
require.NoError(t, sr.Close()) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var dec record.Decoder |
|
|
|
|
r := wlog.NewReader(sr) |
|
|
|
|
@ -127,6 +129,14 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { |
|
|
|
|
samples, err := dec.Samples(rec, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
recs = append(recs, samples) |
|
|
|
|
case record.HistogramSamples: |
|
|
|
|
samples, err := dec.HistogramSamples(rec, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
recs = append(recs, samples) |
|
|
|
|
case record.FloatHistogramSamples: |
|
|
|
|
samples, err := dec.FloatHistogramSamples(rec, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
recs = append(recs, samples) |
|
|
|
|
case record.Tombstones: |
|
|
|
|
tstones, err := dec.Tombstones(rec, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
@ -2824,6 +2834,7 @@ func TestAppendHistogram(t *testing.T) { |
|
|
|
|
ingestTs := int64(0) |
|
|
|
|
app := head.Appender(context.Background()) |
|
|
|
|
|
|
|
|
|
// Integer histograms.
|
|
|
|
|
type timedHistogram struct { |
|
|
|
|
t int64 |
|
|
|
|
h *histogram.Histogram |
|
|
|
|
@ -2844,6 +2855,7 @@ func TestAppendHistogram(t *testing.T) { |
|
|
|
|
t int64 |
|
|
|
|
h *histogram.FloatHistogram |
|
|
|
|
} |
|
|
|
|
// Float counter histograms.
|
|
|
|
|
expFloatHistograms := make([]timedFloatHistogram, 0, numHistograms) |
|
|
|
|
for _, fh := range GenerateTestFloatHistograms(numHistograms) { |
|
|
|
|
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh) |
|
|
|
|
@ -2855,6 +2867,18 @@ func TestAppendHistogram(t *testing.T) { |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Float gauge histograms.
|
|
|
|
|
for _, fh := range GenerateTestGaugeHistograms(numHistograms) { |
|
|
|
|
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) |
|
|
|
|
ingestTs++ |
|
|
|
|
if ingestTs%50 == 0 { |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
|
|
|
|
|
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) |
|
|
|
|
@ -2898,7 +2922,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { |
|
|
|
|
// Series with only histograms.
|
|
|
|
|
s1 := labels.FromStrings("a", "b1") |
|
|
|
|
k1 := s1.String() |
|
|
|
|
numHistograms := 450 |
|
|
|
|
numHistograms := 300 |
|
|
|
|
exp := map[string][]tsdbutil.Sample{} |
|
|
|
|
app := head.Appender(context.Background()) |
|
|
|
|
ts := int64(0) |
|
|
|
|
@ -2916,26 +2940,34 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
for _, h := range GenerateTestFloatHistograms(numHistograms) { |
|
|
|
|
h.Count = h.Count * 2 |
|
|
|
|
h.NegativeSpans = h.PositiveSpans |
|
|
|
|
h.NegativeBuckets = h.PositiveBuckets |
|
|
|
|
_, err := app.AppendHistogram(0, s1, ts, nil, h) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) |
|
|
|
|
ts++ |
|
|
|
|
if ts%5 == 0 { |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
for _, gauge := range []bool{true, false} { |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
var hists []*histogram.FloatHistogram |
|
|
|
|
if gauge { |
|
|
|
|
hists = GenerateTestGaugeHistograms(numHistograms) |
|
|
|
|
} else { |
|
|
|
|
hists = GenerateTestFloatHistograms(numHistograms) |
|
|
|
|
} |
|
|
|
|
for _, h := range hists { |
|
|
|
|
h.Count = h.Count * 2 |
|
|
|
|
h.NegativeSpans = h.PositiveSpans |
|
|
|
|
h.NegativeBuckets = h.PositiveBuckets |
|
|
|
|
_, err := app.AppendHistogram(0, s1, ts, nil, h) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k1] = append(exp[k1], sample{t: ts, fh: h.Copy()}) |
|
|
|
|
ts++ |
|
|
|
|
if ts%5 == 0 { |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
|
|
|
|
|
// There should be 7 mmap chunks in s1.
|
|
|
|
|
ms := head.series.getByHash(s1.Hash(), s1) |
|
|
|
|
require.Len(t, ms.mmappedChunks, 7) |
|
|
|
|
expMmapChunks := make([]*mmappedChunk, 0, 7) |
|
|
|
|
require.Len(t, ms.mmappedChunks, 8) |
|
|
|
|
expMmapChunks := make([]*mmappedChunk, 0, 8) |
|
|
|
|
for _, mmap := range ms.mmappedChunks { |
|
|
|
|
require.Greater(t, mmap.numSamples, uint16(0)) |
|
|
|
|
cpy := *mmap |
|
|
|
|
@ -2972,51 +3004,68 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
for _, h := range GenerateTestFloatHistograms(100) { |
|
|
|
|
ts++ |
|
|
|
|
h.Count = h.Count * 2 |
|
|
|
|
h.NegativeSpans = h.PositiveSpans |
|
|
|
|
h.NegativeBuckets = h.PositiveBuckets |
|
|
|
|
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) |
|
|
|
|
if ts%20 == 0 { |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
// Add some float.
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
ts++ |
|
|
|
|
_, err := app.Append(0, s2, int64(ts), float64(ts)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) |
|
|
|
|
for _, gauge := range []bool{true, false} { |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
var hists []*histogram.FloatHistogram |
|
|
|
|
if gauge { |
|
|
|
|
hists = GenerateTestGaugeHistograms(100) |
|
|
|
|
} else { |
|
|
|
|
hists = GenerateTestFloatHistograms(100) |
|
|
|
|
} |
|
|
|
|
for _, h := range hists { |
|
|
|
|
ts++ |
|
|
|
|
h.Count = h.Count * 2 |
|
|
|
|
h.NegativeSpans = h.PositiveSpans |
|
|
|
|
h.NegativeBuckets = h.PositiveBuckets |
|
|
|
|
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) |
|
|
|
|
if ts%20 == 0 { |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
// Add some float.
|
|
|
|
|
for i := 0; i < 10; i++ { |
|
|
|
|
ts++ |
|
|
|
|
_, err := app.Append(0, s2, int64(ts), float64(ts)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
exp[k2] = append(exp[k2], sample{t: int64(ts), v: float64(ts)}) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
app = head.Appender(context.Background()) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
|
|
|
|
|
// Restart head.
|
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, head.Init(0)) |
|
|
|
|
startHead := func() { |
|
|
|
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, head.Init(0)) |
|
|
|
|
} |
|
|
|
|
startHead() |
|
|
|
|
|
|
|
|
|
// Checking contents of s1.
|
|
|
|
|
ms = head.series.getByHash(s1.Hash(), s1) |
|
|
|
|
require.Equal(t, expMmapChunks, ms.mmappedChunks) |
|
|
|
|
for _, mmap := range ms.mmappedChunks { |
|
|
|
|
require.Greater(t, mmap.numSamples, uint16(0)) |
|
|
|
|
} |
|
|
|
|
require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples()) |
|
|
|
|
|
|
|
|
|
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) |
|
|
|
|
require.Equal(t, exp, act) |
|
|
|
|
testQuery := func() { |
|
|
|
|
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) |
|
|
|
|
require.Equal(t, exp, act) |
|
|
|
|
} |
|
|
|
|
testQuery() |
|
|
|
|
|
|
|
|
|
// Restart with no mmap chunks to test WAL replay.
|
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) |
|
|
|
|
startHead() |
|
|
|
|
testQuery() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestChunkSnapshot(t *testing.T) { |
|
|
|
|
@ -3522,7 +3571,7 @@ func TestHistogramCounterResetHeader(t *testing.T) { |
|
|
|
|
if floatHisto { |
|
|
|
|
_, err = app.AppendHistogram(0, l, ts, nil, h.ToFloat()) |
|
|
|
|
} else { |
|
|
|
|
_, err = app.AppendHistogram(0, l, ts, h, nil) |
|
|
|
|
_, err = app.AppendHistogram(0, l, ts, h.Copy(), nil) |
|
|
|
|
} |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
@ -3553,10 +3602,6 @@ func TestHistogramCounterResetHeader(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
h := GenerateTestHistograms(1)[0] |
|
|
|
|
if len(h.NegativeBuckets) == 0 { |
|
|
|
|
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...) |
|
|
|
|
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...) |
|
|
|
|
} |
|
|
|
|
h.PositiveBuckets = []int64{100, 1, 1, 1} |
|
|
|
|
h.NegativeBuckets = []int64{100, 1, 1, 1} |
|
|
|
|
h.Count = 1000 |
|
|
|
|
@ -4517,3 +4562,78 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) { |
|
|
|
|
require.NoError(t, h.truncateOOO(0, 2)) |
|
|
|
|
require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestGaugeHistogramWALAndChunkHeader(t *testing.T) { |
|
|
|
|
l := labels.FromStrings("a", "b") |
|
|
|
|
head, _ := newTestHead(t, 1000, false, false) |
|
|
|
|
t.Cleanup(func() { |
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
}) |
|
|
|
|
require.NoError(t, head.Init(0)) |
|
|
|
|
|
|
|
|
|
ts := int64(0) |
|
|
|
|
appendHistogram := func(h *histogram.FloatHistogram) { |
|
|
|
|
ts++ |
|
|
|
|
app := head.Appender(context.Background()) |
|
|
|
|
_, err := app.AppendHistogram(0, l, ts, nil, h.Copy()) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, app.Commit()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
hists := GenerateTestGaugeHistograms(5) |
|
|
|
|
hists[0].CounterResetHint = histogram.UnknownCounterReset |
|
|
|
|
appendHistogram(hists[0]) |
|
|
|
|
appendHistogram(hists[1]) |
|
|
|
|
appendHistogram(hists[2]) |
|
|
|
|
hists[3].CounterResetHint = histogram.UnknownCounterReset |
|
|
|
|
appendHistogram(hists[3]) |
|
|
|
|
appendHistogram(hists[3]) |
|
|
|
|
appendHistogram(hists[4]) |
|
|
|
|
|
|
|
|
|
checkHeaders := func() { |
|
|
|
|
ms, _, err := head.getOrCreate(l.Hash(), l) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Len(t, ms.mmappedChunks, 3) |
|
|
|
|
expHeaders := []chunkenc.CounterResetHeader{ |
|
|
|
|
chunkenc.UnknownCounterReset, |
|
|
|
|
chunkenc.GaugeType, |
|
|
|
|
chunkenc.UnknownCounterReset, |
|
|
|
|
chunkenc.GaugeType, |
|
|
|
|
} |
|
|
|
|
for i, mmapChunk := range ms.mmappedChunks { |
|
|
|
|
chk, err := head.chunkDiskMapper.Chunk(mmapChunk.ref) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) |
|
|
|
|
} |
|
|
|
|
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader()) |
|
|
|
|
} |
|
|
|
|
checkHeaders() |
|
|
|
|
|
|
|
|
|
recs := readTestWAL(t, head.wal.Dir()) |
|
|
|
|
require.Equal(t, []interface{}{ |
|
|
|
|
[]record.RefSeries{ |
|
|
|
|
{ |
|
|
|
|
Ref: 1, |
|
|
|
|
Labels: labels.FromStrings("a", "b"), |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 1, FH: hists[0]}}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 2, FH: hists[1]}}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 3, FH: hists[2]}}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 4, FH: hists[3]}}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 5, FH: hists[3]}}, |
|
|
|
|
[]record.RefFloatHistogramSample{{Ref: 1, T: 6, FH: hists[4]}}, |
|
|
|
|
}, recs) |
|
|
|
|
|
|
|
|
|
// Restart Head without mmap chunks to expect the WAL replay to recognize gauge histograms.
|
|
|
|
|
require.NoError(t, head.Close()) |
|
|
|
|
require.NoError(t, os.RemoveAll(mmappedChunksDir(head.opts.ChunkDirRoot))) |
|
|
|
|
|
|
|
|
|
w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
head, err = NewHead(nil, nil, w, nil, head.opts, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NoError(t, head.Init(0)) |
|
|
|
|
|
|
|
|
|
checkHeaders() |
|
|
|
|
} |
|
|
|
|
|