diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 8cfd0b44e8..309dedb0b9 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -474,9 +474,10 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { - its = append(its, b.iterator(ctx, c.readers, filter)) + if maxt < b.mint || b.maxt < mint { + continue } + its = append(its, b.iterator(ctx, c.readers, filter)) } if !c.head.isEmpty() { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 72b1961d41..9d6714507d 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -10,9 +10,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/dustin/go-humanize" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc/testdata" @@ -644,3 +643,73 @@ func BenchmarkHeadBlockIterator(b *testing.B) { }) } } + +func TestMemChunk_IteratorBounds(t *testing.T) { + + var createChunk = func() *MemChunk { + t.Helper() + c := NewMemChunk(EncNone, 1e6, 1e6) + + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 1), + Line: "1", + }); err != nil { + t.Fatal(err) + } + if err := c.Append(&logproto.Entry{ + Timestamp: time.Unix(0, 2), + Line: "2", + }); err != nil { + t.Fatal(err) + } + return c + } + + for _, tt := range []struct { + mint, maxt time.Time + direction logproto.Direction + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 1), logproto.FORWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.FORWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.FORWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.FORWARD, []bool{false}}, + + {time.Unix(0, 0), time.Unix(0, 1), logproto.BACKWARD, []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 2), time.Unix(0, 2), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), logproto.BACKWARD, []bool{true, true, false}}, + {time.Unix(0, 2), time.Unix(0, 3), logproto.BACKWARD, []bool{true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), logproto.BACKWARD, []bool{false}}, + } { + t.Run( + fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction), + func(t *testing.T) { + tt := tt + c := createChunk() + + // testing headchunk + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + + // testing chunk blocks + require.NoError(t, c.cut()) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil) + require.NoError(t, err) + for i := range tt.expect { + require.Equal(t, tt.expect[i], it.Next()) + } + require.NoError(t, it.Close()) + }) + + } + +} diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 45ebff74d4..4c5669b20f 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -503,11 +503,18 @@ func (i *timeRangedIterator) Next() bool { ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() + if !ok { + continue + } ts = i.EntryIterator.Entry().Timestamp } - - if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. - ok = false + if ok { + if ts.Equal(i.mint) { // The mint is inclusive + return true + } + if i.maxt.Before(ts) || i.maxt.Equal(ts) { // The maxt is exclusive. + ok = false + } } if !ok { i.EntryIterator.Close() diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index c02246fc0e..7a7f3cbadd 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -7,9 +7,8 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" @@ -525,3 +524,40 @@ func Test_DuplicateCount(t *testing.T) { }) } } + +func Test_timeRangedIterator_Next(t *testing.T) { + + tests := []struct { + mint time.Time + maxt time.Time + expect []bool // array of expected values for next call in sequence + }{ + {time.Unix(0, 0), time.Unix(0, 0), []bool{false}}, + {time.Unix(0, 0), time.Unix(0, 1), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 1), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 2), []bool{true, false}}, + {time.Unix(0, 1), time.Unix(0, 3), []bool{true, true, false}}, + {time.Unix(0, 3), time.Unix(0, 3), []bool{true, false}}, + {time.Unix(0, 4), time.Unix(0, 10), []bool{false}}, + {time.Unix(0, 1), time.Unix(0, 10), []bool{true, true, true, false}}, + {time.Unix(0, 0), time.Unix(0, 10), []bool{true, true, true, false}}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("mint:%d maxt:%d", tt.mint.UnixNano(), tt.maxt.UnixNano()), func(t *testing.T) { + i := NewTimeRangedIterator( + NewStreamIterator( + logproto.Stream{Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1)}, + {Timestamp: time.Unix(0, 2)}, + {Timestamp: time.Unix(0, 3)}, + }}), + tt.mint, + tt.maxt, + ) + for _, b := range tt.expect { + require.Equal(t, b, i.Next()) + } + require.NoError(t, i.Close()) + }) + } +} diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index ecee6c6b94..d0b4a76b19 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -470,17 +470,8 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if _, ok := h.receivedMap[file]; ok { - h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) - } else { - h.receivedMap[file] = s.Entries - } - - if _, ok := h.receivedLabels[file]; ok { - h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) - } else { - h.receivedLabels[file] = []labels.Labels{parsedLabels} - } + h.receivedMap[file] = append(h.receivedMap[file], s.Entries...) + h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels) } diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 1381550f1c..b3ce7e2459 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -248,7 +248,9 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { } else { from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - if from.Before(it.req.Start) { + // when clipping the from it should never be before the start or equal to the end. + // Doing so would include entries not requested. + if from.Before(it.req.Start) || from.Equal(it.req.End) { from = it.req.Start } }