This functionality is useful for a lot of clients but not relevant to the TSDB's core features.pull/5805/head
parent
e478d0e3bc
commit
f85d89abc0
@ -0,0 +1,203 @@ |
||||
package tsdbutil |
||||
|
||||
import ( |
||||
"math" |
||||
|
||||
"github.com/fabxc/tsdb" |
||||
) |
||||
|
||||
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
||||
type BufferedSeriesIterator struct { |
||||
it tsdb.SeriesIterator |
||||
buf *sampleRing |
||||
|
||||
lastTime int64 |
||||
} |
||||
|
||||
// NewBuffer returns a new iterator that buffers the values within the time range
|
||||
// of the current element and the duration of delta before.
|
||||
func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator { |
||||
return &BufferedSeriesIterator{ |
||||
it: it, |
||||
buf: newSampleRing(delta, 16), |
||||
lastTime: math.MinInt64, |
||||
} |
||||
} |
||||
|
||||
// PeekBack returns the previous element of the iterator. If there is none buffered,
|
||||
// ok is false.
|
||||
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { |
||||
return b.buf.last() |
||||
} |
||||
|
||||
// Buffer returns an iterator over the buffered data.
|
||||
func (b *BufferedSeriesIterator) Buffer() tsdb.SeriesIterator { |
||||
return b.buf.iterator() |
||||
} |
||||
|
||||
// Seek advances the iterator to the element at time t or greater.
|
||||
func (b *BufferedSeriesIterator) Seek(t int64) bool { |
||||
t0 := t - b.buf.delta |
||||
|
||||
// If the delta would cause us to seek backwards, preserve the buffer
|
||||
// and just continue regular advancment while filling the buffer on the way.
|
||||
if t0 > b.lastTime { |
||||
b.buf.reset() |
||||
|
||||
ok := b.it.Seek(t0) |
||||
if !ok { |
||||
return false |
||||
} |
||||
b.lastTime, _ = b.At() |
||||
} |
||||
|
||||
if b.lastTime >= t { |
||||
return true |
||||
} |
||||
for b.Next() { |
||||
if b.lastTime >= t { |
||||
return true |
||||
} |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
// Next advances the iterator to the next element.
|
||||
func (b *BufferedSeriesIterator) Next() bool { |
||||
// Add current element to buffer before advancing.
|
||||
b.buf.add(b.it.At()) |
||||
|
||||
ok := b.it.Next() |
||||
if ok { |
||||
b.lastTime, _ = b.At() |
||||
} |
||||
return ok |
||||
} |
||||
|
||||
// At returns the current element of the iterator.
|
||||
func (b *BufferedSeriesIterator) At() (int64, float64) { |
||||
return b.it.At() |
||||
} |
||||
|
||||
// Err returns the last encountered error.
|
||||
func (b *BufferedSeriesIterator) Err() error { |
||||
return b.it.Err() |
||||
} |
||||
|
||||
type sample struct { |
||||
t int64 |
||||
v float64 |
||||
} |
||||
|
||||
type sampleRing struct { |
||||
delta int64 |
||||
|
||||
buf []sample // lookback buffer
|
||||
i int // position of most recent element in ring buffer
|
||||
f int // position of first element in ring buffer
|
||||
l int // number of elements in buffer
|
||||
} |
||||
|
||||
func newSampleRing(delta int64, sz int) *sampleRing { |
||||
r := &sampleRing{delta: delta, buf: make([]sample, sz)} |
||||
r.reset() |
||||
|
||||
return r |
||||
} |
||||
|
||||
func (r *sampleRing) reset() { |
||||
r.l = 0 |
||||
r.i = -1 |
||||
r.f = 0 |
||||
} |
||||
|
||||
func (r *sampleRing) iterator() tsdb.SeriesIterator { |
||||
return &sampleRingIterator{r: r, i: -1} |
||||
} |
||||
|
||||
type sampleRingIterator struct { |
||||
r *sampleRing |
||||
i int |
||||
} |
||||
|
||||
func (it *sampleRingIterator) Next() bool { |
||||
it.i++ |
||||
return it.i < it.r.l |
||||
} |
||||
|
||||
func (it *sampleRingIterator) Seek(int64) bool { |
||||
return false |
||||
} |
||||
|
||||
func (it *sampleRingIterator) Err() error { |
||||
return nil |
||||
} |
||||
|
||||
func (it *sampleRingIterator) At() (int64, float64) { |
||||
return it.r.at(it.i) |
||||
} |
||||
|
||||
func (r *sampleRing) at(i int) (int64, float64) { |
||||
j := (r.f + i) % len(r.buf) |
||||
s := r.buf[j] |
||||
return s.t, s.v |
||||
} |
||||
|
||||
// add adds a sample to the ring buffer and frees all samples that fall
|
||||
// out of the delta range.
|
||||
func (r *sampleRing) add(t int64, v float64) { |
||||
l := len(r.buf) |
||||
// Grow the ring buffer if it fits no more elements.
|
||||
if l == r.l { |
||||
buf := make([]sample, 2*l) |
||||
copy(buf[l+r.f:], r.buf[r.f:]) |
||||
copy(buf, r.buf[:r.f]) |
||||
|
||||
r.buf = buf |
||||
r.i = r.f |
||||
r.f += l |
||||
} else { |
||||
r.i++ |
||||
if r.i >= l { |
||||
r.i -= l |
||||
} |
||||
} |
||||
|
||||
r.buf[r.i] = sample{t: t, v: v} |
||||
r.l++ |
||||
|
||||
// Free head of the buffer of samples that just fell out of the range.
|
||||
for r.buf[r.f].t < t-r.delta { |
||||
r.f++ |
||||
if r.f >= l { |
||||
r.f -= l |
||||
} |
||||
r.l-- |
||||
} |
||||
} |
||||
|
||||
// last returns the most recent element added to the ring.
|
||||
func (r *sampleRing) last() (int64, float64, bool) { |
||||
if r.l == 0 { |
||||
return 0, 0, false |
||||
} |
||||
s := r.buf[r.i] |
||||
return s.t, s.v, true |
||||
} |
||||
|
||||
func (r *sampleRing) samples() []sample { |
||||
res := make([]sample, r.l) |
||||
|
||||
var k = r.f + r.l |
||||
var j int |
||||
if k > len(r.buf) { |
||||
k = len(r.buf) |
||||
j = r.l - k + r.f |
||||
} |
||||
|
||||
n := copy(res, r.buf[r.f:k]) |
||||
copy(res[n:], r.buf[:j]) |
||||
|
||||
return res |
||||
} |
||||
@ -0,0 +1,160 @@ |
||||
package tsdbutil |
||||
|
||||
import ( |
||||
"math/rand" |
||||
"sort" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestSampleRing(t *testing.T) { |
||||
cases := []struct { |
||||
input []int64 |
||||
delta int64 |
||||
size int |
||||
}{ |
||||
{ |
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, |
||||
delta: 2, |
||||
size: 1, |
||||
}, |
||||
{ |
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, |
||||
delta: 2, |
||||
size: 2, |
||||
}, |
||||
{ |
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, |
||||
delta: 7, |
||||
size: 3, |
||||
}, |
||||
{ |
||||
input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20}, |
||||
delta: 7, |
||||
size: 1, |
||||
}, |
||||
} |
||||
for _, c := range cases { |
||||
r := newSampleRing(c.delta, c.size) |
||||
|
||||
input := []sample{} |
||||
for _, t := range c.input { |
||||
input = append(input, sample{ |
||||
t: t, |
||||
v: float64(rand.Intn(100)), |
||||
}) |
||||
} |
||||
|
||||
for i, s := range input { |
||||
r.add(s.t, s.v) |
||||
buffered := r.samples() |
||||
|
||||
for _, sold := range input[:i] { |
||||
found := false |
||||
for _, bs := range buffered { |
||||
if bs.t == sold.t && bs.v == sold.v { |
||||
found = true |
||||
break |
||||
} |
||||
} |
||||
if sold.t >= s.t-c.delta && !found { |
||||
t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered) |
||||
} |
||||
if sold.t < s.t-c.delta && found { |
||||
t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func TestBufferedSeriesIterator(t *testing.T) { |
||||
var it *BufferedSeriesIterator |
||||
|
||||
bufferEq := func(exp []sample) { |
||||
var b []sample |
||||
bit := it.Buffer() |
||||
for bit.Next() { |
||||
t, v := bit.At() |
||||
b = append(b, sample{t: t, v: v}) |
||||
} |
||||
require.Equal(t, exp, b, "buffer mismatch") |
||||
} |
||||
sampleEq := func(ets int64, ev float64) { |
||||
ts, v := it.At() |
||||
require.Equal(t, ets, ts, "timestamp mismatch") |
||||
require.Equal(t, ev, v, "value mismatch") |
||||
} |
||||
|
||||
it = NewBuffer(newListSeriesIterator([]sample{ |
||||
{t: 1, v: 2}, |
||||
{t: 2, v: 3}, |
||||
{t: 3, v: 4}, |
||||
{t: 4, v: 5}, |
||||
{t: 5, v: 6}, |
||||
{t: 99, v: 8}, |
||||
{t: 100, v: 9}, |
||||
{t: 101, v: 10}, |
||||
}), 2) |
||||
|
||||
require.True(t, it.Seek(-123), "seek failed") |
||||
sampleEq(1, 2) |
||||
bufferEq(nil) |
||||
|
||||
require.True(t, it.Next(), "next failed") |
||||
sampleEq(2, 3) |
||||
bufferEq([]sample{{t: 1, v: 2}}) |
||||
|
||||
require.True(t, it.Next(), "next failed") |
||||
require.True(t, it.Next(), "next failed") |
||||
require.True(t, it.Next(), "next failed") |
||||
sampleEq(5, 6) |
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) |
||||
|
||||
require.True(t, it.Seek(5), "seek failed") |
||||
sampleEq(5, 6) |
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) |
||||
|
||||
require.True(t, it.Seek(101), "seek failed") |
||||
sampleEq(101, 10) |
||||
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) |
||||
|
||||
require.False(t, it.Next(), "next succeeded unexpectedly") |
||||
} |
||||
|
||||
type listSeriesIterator struct { |
||||
list []sample |
||||
idx int |
||||
} |
||||
|
||||
func newListSeriesIterator(list []sample) *listSeriesIterator { |
||||
return &listSeriesIterator{list: list, idx: -1} |
||||
} |
||||
|
||||
func (it *listSeriesIterator) At() (int64, float64) { |
||||
s := it.list[it.idx] |
||||
return s.t, s.v |
||||
} |
||||
|
||||
func (it *listSeriesIterator) Next() bool { |
||||
it.idx++ |
||||
return it.idx < len(it.list) |
||||
} |
||||
|
||||
func (it *listSeriesIterator) Seek(t int64) bool { |
||||
if it.idx == -1 { |
||||
it.idx = 0 |
||||
} |
||||
// Do binary search between current position and end.
|
||||
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { |
||||
s := it.list[i+it.idx] |
||||
return s.t >= t |
||||
}) |
||||
|
||||
return it.idx < len(it.list) |
||||
} |
||||
|
||||
func (it *listSeriesIterator) Err() error { |
||||
return nil |
||||
} |
||||
Loading…
Reference in new issue