From a98b7ebaf494301ea0afe29317eaf554b900f7fb Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 31 Aug 2022 15:20:08 -0600 Subject: [PATCH] Hide duplicates caused by stream sharding (#7005) As streams are sharded and resharded across different ingesters, the likelihood of duplicate logs being ingested increases dramatically. This removes duplicates at query time by dynamically removing the `__shard_stream__` label for both log and sample queries so the `stream hash` used by the `MergeEntryIterator` will match when doing comparisons for duplicates. The `stream hash` is created in either the `store` or `ingester instance` at query time so the labels are removed there. Additionally, the `MergeEntryIterator` has been modified to remove duplicates in a stream rather than just across streams. For completeness, `__shard_stream__` has also been removed from Label and Series Queries. --- pkg/ingester/ingester.go | 7 +- pkg/ingester/ingester_test.go | 5 +- pkg/ingester/instance.go | 18 +++- pkg/ingester/instance_test.go | 81 ++++++++++++++- pkg/iter/entry_iterator.go | 17 +--- pkg/iter/entry_iterator_test.go | 51 ++++++++++ pkg/iter/sample_iterator.go | 16 +-- pkg/iter/sample_iterator_test.go | 32 ++++++ pkg/querier/querier.go | 4 +- pkg/querier/querier_mock_test.go | 16 +++ pkg/querier/querier_test.go | 41 ++++++++ pkg/storage/async_store_test.go | 2 +- pkg/storage/batch.go | 13 ++- pkg/storage/batch_test.go | 164 +++++++++++++++---------------- pkg/storage/lazy_chunk_test.go | 4 +- pkg/storage/store_test.go | 4 +- pkg/storage/util_test.go | 6 +- 17 files changed, 348 insertions(+), 133 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 192dc4a77a..b1fa5f3363 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -700,10 +700,12 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log if err != nil { return err } + var iters []iter.SampleIterator it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) if err != nil { return err } + iters = append(iters, it) if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ @@ -718,13 +720,12 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) return err } - - it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) + iters = append(iters, storeItr) } defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendSampleBatches(ctx, it, queryServer) + return sendSampleBatches(ctx, iter.NewMergeSampleIterator(ctx, iters), queryServer) } // asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`. diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 92a16f35da..9fec08385d 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -12,6 +12,9 @@ import ( "github.com/gogo/protobuf/types" "github.com/gogo/status" + + "github.com/grafana/loki/pkg/distributor" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/services" "github.com/prometheus/common/model" @@ -849,7 +852,7 @@ func Test_DedupeIngesterParser(t *testing.T) { defer closer() for i := 0; i < streamCount; i++ { - streams = append(streams, labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i))) + streams = append(streams, labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i), distributor.ShardLbName, fmt.Sprint(i%2))) } for i := 0; i < requests; i++ { diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 05bd3d6ec3..0bc3c8b89b 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,6 +10,8 @@ import ( "syscall" "time" + "github.com/grafana/loki/pkg/distributor" + "github.com/go-kit/log/level" spb "github.com/gogo/googleapis/google/rpc" "github.com/gogo/protobuf/types" @@ -523,14 +525,20 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matche Values: labels, }, nil } + names, err := i.index.LabelNames(*req.Start, nil) if err != nil { return nil, err } - labels = make([]string, len(names)) - for i := 0; i < len(names); i++ { - labels[i] = names[i] + + labels = make([]string, 0, len(names)) + for _, n := range names { + if n == distributor.ShardLbName { + continue + } + labels = append(labels, n) } + return &logproto.LabelResponse{ Values: labels, }, nil @@ -721,6 +729,10 @@ outer: if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) { continue } + + // To Enable downstream deduplication of sharded streams, remove the shard label + stream.labels = labels.NewBuilder(stream.labels).Del(distributor.ShardLbName).Labels() + err := fn(stream) if err != nil { return err diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 99571c77cb..1006109990 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/distributor" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/chunk" @@ -198,8 +200,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { currentTime := time.Now() testStreams := []logproto.Stream{ - {Labels: "{app=\"test\",job=\"varlogs\"}", Entries: entries(5, currentTime)}, - {Labels: "{app=\"test2\",job=\"varlogs\"}", Entries: entries(5, currentTime.Add(6*time.Nanosecond))}, + {Labels: fmt.Sprintf("{app=\"test\",job=\"varlogs\",%s=\"1\"}", distributor.ShardLbName), Entries: entries(5, currentTime)}, + {Labels: fmt.Sprintf("{app=\"test2\",job=\"varlogs\",%s=\"1\"}", distributor.ShardLbName), Entries: entries(5, currentTime.Add(6*time.Nanosecond))}, } for _, testStream := range testStreams { @@ -331,7 +333,7 @@ func Test_SeriesQuery(t *testing.T) { }, []logproto.SeriesIdentifier{ // Separated by shard number - {Labels: map[string]string{"app": "test2", "job": "varlogs"}}, + {Labels: map[string]string{"app": "test", "job": "varlogs"}}, }, }, { @@ -528,6 +530,79 @@ func Test_Iterator(t *testing.T) { require.Equal(t, int64(8), res.Streams[1].Entries[0].Timestamp.UnixNano()) } +func Test_IteratorFiltersStreamLabels(t *testing.T) { + instance := defaultInstance(t) + + require.NoError(t, + instance.Push(context.TODO(), &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="worker",job="3",%s="1"}`, distributor.ShardLbName), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(1)), Line: fmt.Sprint(`msg="worker_1"`)}, + }, + }, + }, + }), + ) + + it, err := instance.Query(context.TODO(), + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"} | logfmt`, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: logproto.BACKWARD, + }, + }, + ) + require.NoError(t, err) + + // All of the streams are returned but none have the shard label + var count int + for it.Next() { + count++ + require.NotContains(t, it.Labels(), distributor.ShardLbName) + } + require.Equal(t, 11, count) +} + +func Test_SamplesIteratorFiltersStreamLabels(t *testing.T) { + instance := defaultInstance(t) + + require.NoError(t, + instance.Push(context.TODO(), &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{host="agent", log_stream="worker",job="3",%s="1"}`, distributor.ShardLbName), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, int64(1)), Line: fmt.Sprint(`msg="worker_1"`)}, + }, + }, + }, + }), + ) + + it, err := instance.QuerySample(context.TODO(), + logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: `count_over_time({job="3"}[5m])`, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + }, + }, + ) + require.NoError(t, err) + + // All of the streams are returned but none have the shard label + var count int + for it.Next() { + count++ + require.NotContains(t, it.Labels(), distributor.ShardLbName) + } + require.Equal(t, 11, count) +} + type testFilter struct{} func (t *testFilter) ForRequest(ctx context.Context) chunk.Filterer { diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index ee26548e66..105ed1264a 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -193,18 +193,6 @@ func (i *mergeEntryIterator) Next() bool { return false } - // shortcut for the last iterator. - if i.heap.Len() == 1 { - i.currEntry.Entry = i.heap.Peek().Entry() - i.currEntry.labels = i.heap.Peek().Labels() - i.currEntry.streamHash = i.heap.Peek().StreamHash() - - if !i.heap.Peek().Next() { - i.heap.Pop() - } - return true - } - // We support multiple entries with the same timestamp, and we want to // preserve their original order. We look at all the top entries in the // heap with the same timestamp, and pop the ones whose common value @@ -220,9 +208,8 @@ Outer: } heap.Pop(i.heap) - previous := i.buffer var dupe bool - for _, t := range previous { + for _, t := range i.buffer { if t.Entry.Line == entry.Line { i.stats.AddDuplicates(1) dupe = true @@ -246,7 +233,7 @@ Outer: !entry.Timestamp.Equal(i.buffer[0].Entry.Timestamp) { break } - for _, t := range previous { + for _, t := range i.buffer { if t.Entry.Line == entry.Line { i.stats.AddDuplicates(1) continue inner diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index 16d7ad9a69..fce2e568ee 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -499,6 +499,23 @@ func Test_DuplicateCount(t *testing.T) { }, } + dupeStream := logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 0), + Line: "foo", + }, + { + Timestamp: time.Unix(0, 0), + Line: "foo", + }, + { + Timestamp: time.Unix(0, 0), + Line: "foo", + }, + }, + } + for _, test := range []struct { name string iters []EntryIterator @@ -526,6 +543,14 @@ func Test_DuplicateCount(t *testing.T) { logproto.BACKWARD, 3, }, + { + "stream with duplicates", + []EntryIterator{ + NewStreamIterator(dupeStream), + }, + logproto.BACKWARD, + 2, + }, { "replication 2 f", []EntryIterator{ @@ -898,3 +923,29 @@ func TestDedupeMergeEntryIterator(t *testing.T) { require.Equal(t, "3", it.Entry().Line) require.Equal(t, time.Unix(2, 0), it.Entry().Timestamp) } + +func TestSingleStreamDedupeMergeEntryIterator(t *testing.T) { + it := NewMergeEntryIterator(context.Background(), + []EntryIterator{ + NewStreamIterator(logproto.Stream{ + Labels: ``, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "0", + }, + { + Timestamp: time.Unix(1, 0), + Line: "0", + }, + { + Timestamp: time.Unix(1, 0), + Line: "0", + }, + }, + }), + }, logproto.FORWARD) + require.True(t, it.Next()) + require.Equal(t, "0", it.Entry().Line) + require.False(t, it.Next()) +} diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index d84bd7f3d2..37e1175287 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -228,17 +228,6 @@ func (i *mergeSampleIterator) Next() bool { return false } - // shortcut for the last iterator. - if i.heap.Len() == 1 { - i.curr.Sample = i.heap.Peek().Sample() - i.curr.labels = i.heap.Peek().Labels() - i.curr.streamHash = i.heap.Peek().StreamHash() - if !i.heap.Peek().Next() { - i.heap.Pop() - } - return true - } - // We support multiple entries with the same timestamp, and we want to // preserve their original order. We look at all the top entries in the // heap with the same timestamp, and pop the ones whose common value @@ -251,9 +240,8 @@ Outer: break } heap.Pop(i.heap) - previous := i.buffer var dupe bool - for _, t := range previous { + for _, t := range i.buffer { if t.Sample.Hash == sample.Hash { i.stats.AddDuplicates(1) dupe = true @@ -277,7 +265,7 @@ Outer: sample.Timestamp != i.buffer[0].Timestamp { break } - for _, t := range previous { + for _, t := range i.buffer { if t.Hash == sample.Hash { i.stats.AddDuplicates(1) continue inner diff --git a/pkg/iter/sample_iterator_test.go b/pkg/iter/sample_iterator_test.go index ec739e4d5a..9f32abcc3a 100644 --- a/pkg/iter/sample_iterator_test.go +++ b/pkg/iter/sample_iterator_test.go @@ -501,3 +501,35 @@ func TestDedupeMergeSampleIterator(t *testing.T) { require.Equal(t, 1., it.Sample().Value) require.Equal(t, xxhash.Sum64String("3"), it.Sample().Hash) } + +func TestSingleStreamDedupeMergeSampleIterator(t *testing.T) { + it := NewMergeSampleIterator(context.Background(), + []SampleIterator{ + NewSeriesIterator(logproto.Series{ + Labels: ``, + Samples: []logproto.Sample{ + { + Timestamp: time.Unix(1, 0).UnixNano(), + Value: 1., + Hash: xxhash.Sum64String("1"), + }, + { + Timestamp: time.Unix(1, 0).UnixNano(), + Value: 1., + Hash: xxhash.Sum64String("1"), + }, + { + Timestamp: time.Unix(1, 0).UnixNano(), + Value: 1., + Hash: xxhash.Sum64String("1"), + }, + }, + StreamHash: 0, + }), + }) + + require.True(t, it.Next()) + require.Equal(t, time.Unix(1, 0).UnixNano(), it.Sample().Timestamp) + require.Equal(t, 1., it.Sample().Value) + require.False(t, it.Next()) +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 326083bd53..34e70b5d18 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -161,9 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec iters = append(iters, storeIter) } - if len(iters) == 1 { - return iters[0], nil - } + return iter.NewMergeEntryIterator(ctx, iters, params.Direction), nil } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 38f88d542b..a73a9bdd67 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -456,6 +456,22 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream } } +func mockStreamWithDupes() logproto.Stream { + entries := make([]logproto.Entry, 0, 5) + + for i := 0; i < 5; i++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "line", + }) + } + + return logproto.Stream{ + Entries: entries, + Labels: `{type="test"}`, + } +} + type querierMock struct { util.ExtendedMock } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index b61650466a..41c60171b0 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" @@ -1110,6 +1112,45 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) { require.Equal(t, "test", delGetter.user) } +func TestQuerier_SelectLogWithDupesInStream(t *testing.T) { + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything).Return(iter.NewStreamIterator(mockStreamWithDupes()), nil) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + cfg := mockQuerierConfig() + cfg.QueryStoreOnly = true + + q, err := newQuerier( + cfg, + mockIngesterClientConfig(), + newIngesterClientMockFactory(nil), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, + limits, + ) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "test") + + request := logproto.QueryRequest{ + Selector: `{type="test"}`, + Limit: 10, + Start: time.Unix(0, 300000000), + End: time.Unix(0, 600000000), + Direction: logproto.FORWARD, + } + + itr, err := q.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: &request}) + require.NoError(t, err) + + require.True(t, itr.Next()) + require.Equal(t, itr.Entry().Line, "line") + require.False(t, itr.Next()) +} + func TestQuerier_SelectSamplesWithDeletes(t *testing.T) { queryClient := newQuerySampleClientMock() queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 2)}), nil) diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index 30e96381cc..04d7b3e2e7 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -66,7 +66,7 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk { } for i := 0; i < num; i++ { - chk := newChunk(buildTestStreams(fooLabelsWithName, timeRange{ + chk := newChunk(buildTestStreams(fooLabelsWithNameAndShard, timeRange{ from: now.Add(time.Duration(i) * time.Minute), to: now.Add(time.Duration(i+1) * time.Minute), })) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index c03ee89359..7b20bea5ec 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -5,6 +5,8 @@ import ( "sort" "time" + "github.com/grafana/loki/pkg/distributor" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -412,7 +414,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { - streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels()) + streamPipeline := it.pipeline.ForStream(filteredLabels(chunks[0][0].Chunk.Metric)) iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk) if err != nil { return nil, err @@ -556,7 +558,7 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La result := make([]iter.SampleIterator, 0, len(chks)) for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { - streamExtractor := it.extractor.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels()) + streamExtractor := it.extractor.ForStream(filteredLabels(chunks[0][0].Chunk.Metric)) iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk) if err != nil { return nil, err @@ -568,6 +570,13 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La return result, nil } +func filteredLabels(ls labels.Labels) labels.Labels { + return labels.NewBuilder(ls). + Del(labels.MetricName). + Del(distributor.ShardLbName). + Labels() +} + func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index cc9a7ba13c..b1c519a501 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -26,7 +26,7 @@ var NilMetrics = NewChunkMetrics(nil, 0) func Test_batchIterSafeStart(t *testing.T) { stream := logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -76,7 +76,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -89,7 +89,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -102,7 +102,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -115,7 +115,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -128,7 +128,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -141,7 +141,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -177,7 +177,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(4 * time.Millisecond), logproto.FORWARD, 2, @@ -185,7 +185,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward all overlap and all chunks have a from time less than query from time": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -198,7 +198,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -215,7 +215,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -232,7 +232,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -249,7 +249,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -266,7 +266,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -306,7 +306,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from.Add(1 * time.Millisecond), from.Add(5 * time.Millisecond), logproto.FORWARD, 2, @@ -314,7 +314,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -331,7 +331,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -344,7 +344,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -357,7 +357,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -389,7 +389,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(3 * time.Millisecond), logproto.FORWARD, 2, @@ -397,7 +397,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -410,7 +410,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -423,7 +423,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -436,7 +436,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -449,7 +449,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -462,7 +462,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -498,7 +498,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(4 * time.Millisecond), logproto.BACKWARD, 2, @@ -506,7 +506,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward all overlap and all chunks have a through time greater than query through time": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -523,7 +523,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -540,7 +540,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -557,7 +557,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -574,7 +574,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -591,7 +591,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -627,7 +627,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(4 * time.Millisecond), logproto.BACKWARD, 2, @@ -635,7 +635,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(0 * time.Millisecond), @@ -648,7 +648,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(1 * time.Millisecond), @@ -661,7 +661,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -674,7 +674,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(4 * time.Millisecond), @@ -726,7 +726,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(8 * time.Millisecond), logproto.BACKWARD, 2, @@ -734,7 +734,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -747,7 +747,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -756,7 +756,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -784,7 +784,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(3 * time.Millisecond), logproto.FORWARD, 2, @@ -792,7 +792,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -805,7 +805,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -814,7 +814,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -842,7 +842,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(3 * time.Millisecond), logproto.BACKWARD, 2, @@ -858,7 +858,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward identicals": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -867,7 +867,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -876,7 +876,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -889,7 +889,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -898,7 +898,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -907,7 +907,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -916,7 +916,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -944,7 +944,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(4 * time.Millisecond), logproto.FORWARD, 1, @@ -988,7 +988,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { "forward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1001,7 +1001,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1014,7 +1014,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1027,7 +1027,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1040,7 +1040,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1053,7 +1053,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -1093,14 +1093,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(4 * time.Millisecond), 2, }, "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1117,7 +1117,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1130,7 +1130,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1143,7 +1143,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1178,14 +1178,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(3 * time.Millisecond), 2, }, "forward last chunk boundaries equal to end": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), @@ -1198,7 +1198,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(2, 0), @@ -1211,7 +1211,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(3, 0), @@ -1241,14 +1241,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), time.Unix(1, 0), time.Unix(3, 0), 2, }, "forward last chunk boundaries equal to end and start": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), @@ -1261,7 +1261,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), @@ -1291,14 +1291,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), time.Unix(1, 0), time.Unix(1, 0), 2, }, "forward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1311,7 +1311,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1320,7 +1320,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -1351,7 +1351,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName.String(), + fooLabelsWithNameAndShard.String(), from, from.Add(3 * time.Millisecond), 2, }, @@ -1389,7 +1389,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { func TestPartitionOverlappingchunks(t *testing.T) { var ( oneThroughFour = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1402,7 +1402,7 @@ func TestPartitionOverlappingchunks(t *testing.T) { }, }) two = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(1 * time.Millisecond), @@ -1411,7 +1411,7 @@ func TestPartitionOverlappingchunks(t *testing.T) { }, }) three = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1634,7 +1634,7 @@ func Test_IsInvalidChunkError(t *testing.T) { func TestBatchCancel(t *testing.T) { createChunk := func(from time.Time) *LazyChunk { return newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), + Labels: fooLabelsWithNameAndShard.String(), Entries: []logproto.Entry{ { Timestamp: from, diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 29b81fc0ba..f98c3f3bd6 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -24,8 +24,8 @@ func TestLazyChunkIterator(t *testing.T) { }{ { newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName.String(), - Hash: fooLabelsWithName.Hash(), + Labels: fooLabelsWithNameAndShard.String(), + Hash: fooLabelsWithNameAndShard.Hash(), Entries: []logproto.Entry{ { Timestamp: from, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 21652feb59..e3e1417217 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1064,7 +1064,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { // build and add chunks to the store addedChunkIDs := map[string]struct{}{} for _, tr := range chunksToBuildForTimeRanges { - chk := newChunk(buildTestStreams(fooLabelsWithName, tr)) + chk := newChunk(buildTestStreams(fooLabelsWithNameAndShard, tr)) err := store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) @@ -1081,7 +1081,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { defer store.Stop() // get all the chunks from both the stores - chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) + chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithNameAndShard.String())...) require.NoError(t, err) var totalChunks int for _, chks := range chunks { diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index fbda277e9a..673a0b88e9 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/distributor" + "github.com/davecgh/go-spew/spew" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -29,8 +31,8 @@ import ( ) var ( - fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}} - fooLabels = labels.Labels{{Name: "foo", Value: "bar"}} + fooLabelsWithNameAndShard = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}, {Name: distributor.ShardLbName, Value: "3"}} + fooLabels = labels.Labels{{Name: "foo", Value: "bar"}} ) var from = time.Unix(0, time.Millisecond.Nanoseconds())