storage: fix missing logs with batched chunk iterator (#1299)

* fix batched chunk iterator

* fix expected series labels in iterator_test.go

* add query range boundary check

* add __name__ label matcher to test cases

* trim __name__ without mutating chunks
pull/1326/head
Putra Sattvika, I Gusti Ngurah 6 years ago committed by Cyril Tovena
parent 8eced44bbc
commit c89df1a795
  1. 75
      pkg/storage/iterator.go
  2. 410
      pkg/storage/iterator_test.go
  3. 1
      pkg/storage/util_test.go

@ -77,6 +77,16 @@ type batchChunkIterator struct {
// newBatchChunkIterator creates a new batch iterator with the given batchSize.
func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator {
// __name__ is not something we filter by because it's a constant in loki and only used for upstream compatibility.
// Therefore remove it
for i := range matchers {
if matchers[i].Name == labels.MetricName {
matchers = append(matchers[:i], matchers[i+1:]...)
break
}
}
res := &batchChunkIterator{
batchSize: batchSize,
matchers: matchers,
@ -112,6 +122,9 @@ func (it *batchChunkIterator) Next() bool {
}
func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// the first chunk of the batch
headChunk := it.chunks.Peek()
// pop the next batch of chunks and append/preprend previous overlapping chunks
// so we can merge/de-dupe overlapping entries.
batch := make([]*chunkenc.LazyChunk, 0, it.batchSize+len(it.lastOverlapping))
@ -130,6 +143,14 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// so that overlapping chunks are together
if it.req.Direction == logproto.BACKWARD {
from = time.Unix(0, nextChunk.Chunk.Through.UnixNano())
// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `from` is equal to the query's Start. This can be achieved
// by shifting `from` by one nanosecond.
if !from.Equal(it.req.Start) {
from = from.Add(time.Nanosecond)
}
} else {
through = time.Unix(0, nextChunk.Chunk.From.UnixNano())
}
@ -149,7 +170,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// └────────────────────┘
//
// And nextChunk is # 49, we need to keep references to #47 and #48 as they won't be
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
it.lastOverlapping = it.lastOverlapping[:0]
for _, c := range batch {
if it.req.Direction == logproto.BACKWARD {
@ -162,13 +183,27 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
}
}
}
}
if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, headChunk.Chunk.Through.UnixNano())
if through.After(it.req.End) {
through = it.req.End
}
// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `through` is equal to the query's End. This can be achieved
// by shifting `through` by one nanosecond.
if !through.Equal(it.req.End) {
through = through.Add(time.Nanosecond)
}
} else {
if len(it.lastOverlapping) > 0 {
if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, it.lastOverlapping[0].Chunk.From.UnixNano())
} else {
from = time.Unix(0, it.lastOverlapping[0].Chunk.Through.UnixNano())
}
from = time.Unix(0, headChunk.Chunk.From.UnixNano())
if from.Before(it.req.Start) {
from = it.req.Start
}
}
@ -250,12 +285,9 @@ func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunken
func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
if chks[0][0].Chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric)
labelsBuilder.Del("__name__")
chks[0][0].Chunk.Metric = labelsBuilder.Labels()
}
labels := chks[0][0].Chunk.Metric.String()
// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()
for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
@ -400,3 +432,20 @@ outer:
return css
}
// dropLabels returns a new label set with certain labels dropped
func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) {
toDel := make(map[string]struct{})
for _, r := range removals {
toDel[r] = struct{}{}
}
for _, l := range ls {
_, remove := toDel[l.Name]
if !remove {
dst = append(dst, l)
}
}
return dst
}

@ -2,12 +2,15 @@ package storage
import (
"context"
"fmt"
"testing"
"time"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)
func Test_newBatchChunkIterator(t *testing.T) {
@ -18,11 +21,12 @@ func Test_newBatchChunkIterator(t *testing.T) {
matchers string
start, end time.Time
direction logproto.Direction
batchSize int
}{
"forward with overlap": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
@ -35,7 +39,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -48,7 +52,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -61,7 +65,120 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "5",
},
},
}),
},
[]*logproto.Stream{
{
Labels: fooLabels,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
},
},
fooLabelsWithName,
from, from.Add(4 * time.Millisecond),
logproto.FORWARD,
2,
},
"forward with overlapping non-continuous entries": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -93,14 +210,15 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
},
},
fooLabels,
fooLabelsWithName,
from, from.Add(3 * time.Millisecond),
logproto.FORWARD,
2,
},
"backward with overlap": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
@ -113,7 +231,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -126,7 +244,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -139,7 +257,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -151,11 +269,41 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "5",
},
},
}),
},
[]*logproto.Stream{
{
Labels: fooLabels,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
@ -171,14 +319,114 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
},
},
fooLabels,
from, from.Add(3 * time.Millisecond),
fooLabelsWithName,
from, from.Add(4 * time.Millisecond),
logproto.BACKWARD,
2,
},
"forward without overlap": {
"backward with overlapping non-continuous entries": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(0 * time.Millisecond),
Line: "0",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "3",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(1 * time.Millisecond),
Line: "1",
},
{
Timestamp: from.Add(6 * time.Millisecond),
Line: "6",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "5",
},
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(7 * time.Millisecond),
Line: "7",
},
},
}),
},
[]*logproto.Stream{
{
Labels: fooLabels,
Entries: []logproto.Entry{
{
Timestamp: from.Add(7 * time.Millisecond),
Line: "7",
},
{
Timestamp: from.Add(6 * time.Millisecond),
Line: "6",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "5",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(1 * time.Millisecond),
Line: "1",
},
{
Timestamp: from.Add(0 * time.Millisecond),
Line: "0",
},
},
},
},
fooLabelsWithName,
from, from.Add(8 * time.Millisecond),
logproto.BACKWARD,
2,
},
"forward without overlap": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
@ -191,7 +439,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -200,7 +448,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -228,14 +476,15 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
},
},
fooLabels,
fooLabelsWithName,
from, from.Add(3 * time.Millisecond),
logproto.FORWARD,
2,
},
"backward without overlap": {
[]*chunkenc.LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
@ -248,7 +497,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -257,7 +506,7 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabels,
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -285,16 +534,17 @@ func Test_newBatchChunkIterator(t *testing.T) {
},
},
},
fooLabels,
fooLabelsWithName,
from, from.Add(3 * time.Millisecond),
logproto.BACKWARD,
2,
},
}
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it := newBatchChunkIterator(context.Background(), tt.chunks, 2, newMatchers(tt.matchers), nil, newQuery("", tt.start, tt.end, tt.direction))
it := newBatchChunkIterator(context.Background(), tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, newQuery("", tt.start, tt.end, tt.direction))
streams, _, err := iter.ReadBatch(it, 1000)
_ = it.Close()
if err != nil {
@ -305,5 +555,127 @@ func Test_newBatchChunkIterator(t *testing.T) {
})
}
}
func TestPartitionOverlappingchunks(t *testing.T) {
var (
oneThroughFour = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
},
})
two = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(1 * time.Millisecond),
Line: "2",
},
},
})
three = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName,
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
},
})
)
for i, tc := range []struct {
input []*chunkenc.LazyChunk
expected [][]*chunkenc.LazyChunk
}{
{
input: []*chunkenc.LazyChunk{
oneThroughFour,
two,
three,
},
expected: [][]*chunkenc.LazyChunk{
[]*chunkenc.LazyChunk{oneThroughFour},
[]*chunkenc.LazyChunk{two, three},
},
},
{
input: []*chunkenc.LazyChunk{
two,
oneThroughFour,
three,
},
expected: [][]*chunkenc.LazyChunk{
[]*chunkenc.LazyChunk{oneThroughFour},
[]*chunkenc.LazyChunk{two, three},
},
},
{
input: []*chunkenc.LazyChunk{
two,
two,
three,
three,
},
expected: [][]*chunkenc.LazyChunk{
[]*chunkenc.LazyChunk{two, three},
[]*chunkenc.LazyChunk{two, three},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
out := partitionOverlappingChunks(tc.input)
require.Equal(t, tc.expected, out)
})
}
}
func TestDropLabels(t *testing.T) {
for i, tc := range []struct {
ls labels.Labels
drop []string
expected labels.Labels
}{
{
ls: labels.Labels{
labels.Label{
Name: "a",
Value: "1",
},
labels.Label{
Name: "b",
Value: "2",
},
labels.Label{
Name: "c",
Value: "3",
},
},
drop: []string{"b"},
expected: labels.Labels{
labels.Label{
Name: "a",
Value: "1",
},
labels.Label{
Name: "c",
Value: "3",
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
dropped := dropLabels(tc.ls, tc.drop...)
require.Equal(t, tc.expected, dropped)
})
}
}

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
)
var fooLabelsWithName = "{foo=\"bar\", __name__=\"log\"}"
var fooLabels = "{foo=\"bar\"}"
var from = time.Unix(0, time.Millisecond.Nanoseconds())

Loading…
Cancel
Save