From d08ceef16b8c52b4fdc4ff90fb4a7785eb4e1d02 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 8 Jun 2020 13:40:31 -0400 Subject: [PATCH] Fixes iterators boundaries. (#2136) The start should be inclusive and the end is supposed to be exclusive. If start and end are equal, then only one entry can be returned which need to also be equal to start. Fixes #2124 Those issues were edge cases where the boundaries of a block or iterator would be equal to the start. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 5 ++- pkg/chunkenc/memchunk_test.go | 73 ++++++++++++++++++++++++++++++++++- pkg/iter/iterator.go | 13 +++++-- pkg/iter/iterator_test.go | 40 ++++++++++++++++++- pkg/promtail/promtail_test.go | 13 +------ pkg/storage/iterator.go | 4 +- 6 files changed, 127 insertions(+), 21 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 8cfd0b44e8..309dedb0b9 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { - its = append(its, b.iterator(ctx, c.readers, filter)) + if maxt < b.mint || b.maxt < mint { + continue } + its = append(its, b.iterator(ctx, c.readers, filter)) } if !c.head.isEmpty() { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 72b1961d41..9d6714507d 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc/testdata" @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) { }) } } + +func TestMemChunk_IteratorBounds(t *testing.T) { + + var createChunk = func() *MemChunk { + t.Helper() + c := NewMemChunk(EncNone, 1e6, 1e6) + + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 1), + Line: "1", + }); err != nil { + t.Fatal(err) + } + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 2), + Line: "2", + }); err != nil { + t.Fatal(err) + } + return c + } + + for _, tt := range []struct { + mint, maxt time.Time + direction logproto.Direction + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}}, + + {time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}}, + } { + t.Run( + fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction), + func(t *testing.T) { + tt := tt + c := createChunk() + + // testing headchunk + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + + // testing chunk blocks + require.NoError(t, c.cut()) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + }) + + } + +} diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 45ebff74d4..4c5669b20f 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool { ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() + if !ok { + continue + } ts = i.EntryIterator.Entry().Timestamp } - - if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. - ok = false + if ok { + if ts.Equal(i.mint) { // The mint is inclusive + return true + } + if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive. + ok = false + } } if !ok { i.EntryIterator.Close() diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index c02246fc0e..7a7f3cbadd 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -7,9 +7,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) { }) } } + +func Test_timeRangedIterator_Next(t *testing.T) { + + tests := []struct { + mint time.Time + maxt time.Time + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 0), []bool{false}}, + {time.Unix(0, 0), time.Unix(0, 1), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}}, + {time.Unix(0, 4), time.Unix(0, 10), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}}, + {time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) { + i := NewTimeRangedIterator( + NewStreamIterator( + logproto.Stream{Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 3)}, + }}), + tt.mint, + tt.maxt, + ) + for _, b := range tt.expect { + require.Equal(t, b, i.Next()) + } + require.NoError(t, i.Close()) + }) + } +} diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index ecee6c6b94..d0b4a76b19 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if _, ok := h.receivedMap[file]; ok { - h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) - } else { - h.receivedMap[file] = s.Entries - } - - if _, ok := h.receivedLabels[file]; ok { - h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) - } else { - h.receivedLabels[file] = []labels.Labels{parsedLabels} - } + h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) + h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) } diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 1381550f1c..b3ce7e2459 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -248,7 +248,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { } else { from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - if from.Before(it.req.Start) { + // when clipping the from it should never be before the start or equal to the end. + // Doing so would include entries not requested. + if from.Before(it.req.Start) || from.Equal(it.req.End) { from = it.req.Start } }