Fixes iterators boundaries. (#2136)

The start should be inclusive and the end is supposed to be exclusive.
If start and end are equal, then only one entry can be returned which need to also be equal to start.

Fixes #2124

Those issues were edge cases where the boundaries of a block or iterator would be equal to the start.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
k21
Cyril Tovena 6 years ago committed by GitHub
parent 6e55277552
commit d08ceef16b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/chunkenc/memchunk.go
  2. 73
      pkg/chunkenc/memchunk_test.go
  3. 13
      pkg/iter/iterator.go
  4. 40
      pkg/iter/iterator_test.go
  5. 13
      pkg/promtail/promtail_test.go
  6. 4
      pkg/storage/iterator.go

@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)
for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
its = append(its, b.iterator(ctx, c.readers, filter))
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, b.iterator(ctx, c.readers, filter))
}
if !c.head.isEmpty() {

@ -10,9 +10,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc/testdata"
@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
})
}
}
func TestMemChunk_IteratorBounds(t *testing.T) {
var createChunk = func() *MemChunk {
t.Helper()
c := NewMemChunk(EncNone, 1e6, 1e6)
if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
Line: "1",
}); err != nil {
t.Fatal(err)
}
if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 2),
Line: "2",
}); err != nil {
t.Fatal(err)
}
return c
}
for _, tt := range []struct {
mint, maxt time.Time
direction logproto.Direction
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}},
{time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}},
{time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}},
{time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}},
} {
t.Run(
fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction),
func(t *testing.T) {
tt := tt
c := createChunk()
// testing headchunk
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())
// testing chunk blocks
require.NoError(t, c.cut())
it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
}
require.NoError(t, it.Close())
})
}
}

@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool {
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
if !ok {
continue
}
ts = i.EntryIterator.Entry().Timestamp
}
if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
ok = false
if ok {
if ts.Equal(i.mint) { // The mint is inclusive
return true
}
if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive.
ok = false
}
}
if !ok {
i.EntryIterator.Close()

@ -7,9 +7,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) {
})
}
}
func Test_timeRangedIterator_Next(t *testing.T) {
tests := []struct {
mint time.Time
maxt time.Time
expect []bool // array of expected values for next call in sequence
}{
{time.Unix(0, 0), time.Unix(0, 0), []bool{false}},
{time.Unix(0, 0), time.Unix(0, 1), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}},
{time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}},
{time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}},
{time.Unix(0, 4), time.Unix(0, 10), []bool{false}},
{time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}},
{time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) {
i := NewTimeRangedIterator(
NewStreamIterator(
logproto.Stream{Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1)},
{Timestamp: time.Unix(0, 2)},
{Timestamp: time.Unix(0, 3)},
}}),
tt.mint,
tt.maxt,
)
for _, b := range tt.expect {
require.Equal(t, b, i.Next())
}
require.NoError(t, i.Close())
})
}
}

@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if _, ok := h.receivedMap[file]; ok {
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
} else {
h.receivedMap[file] = s.Entries
}
if _, ok := h.receivedLabels[file]; ok {
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)
} else {
h.receivedLabels[file] = []labels.Labels{parsedLabels}
}
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)
}

@ -248,7 +248,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
} else {
from = time.Unix(0, headChunk.Chunk.From.UnixNano())
if from.Before(it.req.Start) {
// when clipping the from it should never be before the start or equal to the end.
// Doing so would include entries not requested.
if from.Before(it.req.Start) || from.Equal(it.req.End) {
from = it.req.Start
}
}

Loading…
Cancel
Save