Hide duplicates caused by stream sharding (#7005)

As streams are sharded and resharded across different ingesters, the likelihood of duplicate logs being ingested increases dramatically. This removes duplicates at query time by dynamically removing the `__shard_stream__` label  for both log and sample queries so the `stream hash` used by the `MergeEntryIterator` will match when doing comparisons for duplicates.

The `stream hash` is created in either the `store` or `ingester instance` at query time so the labels are removed there. Additionally, the `MergeEntryIterator` has been modified to remove duplicates in a stream rather than just across streams. 

For completeness, `__shard_stream__` has also been removed from Label and Series Queries.
pull/6842/head^2
Travis Patterson 3 years ago committed by GitHub
parent c23e541fe7
commit a98b7ebaf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/ingester/ingester.go
  2. 5
      pkg/ingester/ingester_test.go
  3. 18
      pkg/ingester/instance.go
  4. 81
      pkg/ingester/instance_test.go
  5. 17
      pkg/iter/entry_iterator.go
  6. 51
      pkg/iter/entry_iterator_test.go
  7. 16
      pkg/iter/sample_iterator.go
  8. 32
      pkg/iter/sample_iterator_test.go
  9. 4
      pkg/querier/querier.go
  10. 16
      pkg/querier/querier_mock_test.go
  11. 41
      pkg/querier/querier_test.go
  12. 2
      pkg/storage/async_store_test.go
  13. 13
      pkg/storage/batch.go
  14. 164
      pkg/storage/batch_test.go
  15. 4
      pkg/storage/lazy_chunk_test.go
  16. 4
      pkg/storage/store_test.go
  17. 6
      pkg/storage/util_test.go

@ -700,10 +700,12 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
if err != nil {
return err
}
var iters []iter.SampleIterator
it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req})
if err != nil {
return err
}
iters = append(iters, it)
if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{
@ -718,13 +720,12 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
return err
}
it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr})
iters = append(iters, storeItr)
}
defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
return sendSampleBatches(ctx, it, queryServer)
return sendSampleBatches(ctx, iter.NewMergeSampleIterator(ctx, iters), queryServer)
}
// asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`.

@ -12,6 +12,9 @@ import (
"github.com/gogo/protobuf/types"
"github.com/gogo/status"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/prometheus/common/model"
@ -849,7 +852,7 @@ func Test_DedupeIngesterParser(t *testing.T) {
defer closer()
for i := 0; i < streamCount; i++ {
streams = append(streams, labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i)))
streams = append(streams, labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i), distributor.ShardLbName, fmt.Sprint(i%2)))
}
for i := 0; i < requests; i++ {

@ -10,6 +10,8 @@ import (
"syscall"
"time"
"github.com/grafana/loki/pkg/distributor"
"github.com/go-kit/log/level"
spb "github.com/gogo/googleapis/google/rpc"
"github.com/gogo/protobuf/types"
@ -523,14 +525,20 @@ func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matche
Values: labels,
}, nil
}
names, err := i.index.LabelNames(*req.Start, nil)
if err != nil {
return nil, err
}
labels = make([]string, len(names))
for i := 0; i < len(names); i++ {
labels[i] = names[i]
labels = make([]string, 0, len(names))
for _, n := range names {
if n == distributor.ShardLbName {
continue
}
labels = append(labels, n)
}
return &logproto.LabelResponse{
Values: labels,
}, nil
@ -721,6 +729,10 @@ outer:
if chunkFilter != nil && chunkFilter.ShouldFilter(stream.labels) {
continue
}
// To Enable downstream deduplication of sharded streams, remove the shard label
stream.labels = labels.NewBuilder(stream.labels).Del(distributor.ShardLbName).Labels()
err := fn(stream)
if err != nil {
return err

@ -10,6 +10,8 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
@ -198,8 +200,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
currentTime := time.Now()
testStreams := []logproto.Stream{
{Labels: "{app=\"test\",job=\"varlogs\"}", Entries: entries(5, currentTime)},
{Labels: "{app=\"test2\",job=\"varlogs\"}", Entries: entries(5, currentTime.Add(6*time.Nanosecond))},
{Labels: fmt.Sprintf("{app=\"test\",job=\"varlogs\",%s=\"1\"}", distributor.ShardLbName), Entries: entries(5, currentTime)},
{Labels: fmt.Sprintf("{app=\"test2\",job=\"varlogs\",%s=\"1\"}", distributor.ShardLbName), Entries: entries(5, currentTime.Add(6*time.Nanosecond))},
}
for _, testStream := range testStreams {
@ -331,7 +333,7 @@ func Test_SeriesQuery(t *testing.T) {
},
[]logproto.SeriesIdentifier{
// Separated by shard number
{Labels: map[string]string{"app": "test2", "job": "varlogs"}},
{Labels: map[string]string{"app": "test", "job": "varlogs"}},
},
},
{
@ -528,6 +530,79 @@ func Test_Iterator(t *testing.T) {
require.Equal(t, int64(8), res.Streams[1].Entries[0].Timestamp.UnixNano())
}
func Test_IteratorFiltersStreamLabels(t *testing.T) {
instance := defaultInstance(t)
require.NoError(t,
instance.Push(context.TODO(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="worker",job="3",%s="1"}`, distributor.ShardLbName),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, int64(1)), Line: fmt.Sprint(`msg="worker_1"`)},
},
},
},
}),
)
it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"} | logfmt`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
},
},
)
require.NoError(t, err)
// All of the streams are returned but none have the shard label
var count int
for it.Next() {
count++
require.NotContains(t, it.Labels(), distributor.ShardLbName)
}
require.Equal(t, 11, count)
}
func Test_SamplesIteratorFiltersStreamLabels(t *testing.T) {
instance := defaultInstance(t)
require.NoError(t,
instance.Push(context.TODO(), &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: fmt.Sprintf(`{host="agent", log_stream="worker",job="3",%s="1"}`, distributor.ShardLbName),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, int64(1)), Line: fmt.Sprint(`msg="worker_1"`)},
},
},
},
}),
)
it, err := instance.QuerySample(context.TODO(),
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `count_over_time({job="3"}[5m])`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
},
},
)
require.NoError(t, err)
// All of the streams are returned but none have the shard label
var count int
for it.Next() {
count++
require.NotContains(t, it.Labels(), distributor.ShardLbName)
}
require.Equal(t, 11, count)
}
type testFilter struct{}
func (t *testFilter) ForRequest(ctx context.Context) chunk.Filterer {

@ -193,18 +193,6 @@ func (i *mergeEntryIterator) Next() bool {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.currEntry.Entry = i.heap.Peek().Entry()
i.currEntry.labels = i.heap.Peek().Labels()
i.currEntry.streamHash = i.heap.Peek().StreamHash()
if !i.heap.Peek().Next() {
i.heap.Pop()
}
return true
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
@ -220,9 +208,8 @@ Outer:
}
heap.Pop(i.heap)
previous := i.buffer
var dupe bool
for _, t := range previous {
for _, t := range i.buffer {
if t.Entry.Line == entry.Line {
i.stats.AddDuplicates(1)
dupe = true
@ -246,7 +233,7 @@ Outer:
!entry.Timestamp.Equal(i.buffer[0].Entry.Timestamp) {
break
}
for _, t := range previous {
for _, t := range i.buffer {
if t.Entry.Line == entry.Line {
i.stats.AddDuplicates(1)
continue inner

@ -499,6 +499,23 @@ func Test_DuplicateCount(t *testing.T) {
},
}
dupeStream := logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "foo",
},
{
Timestamp: time.Unix(0, 0),
Line: "foo",
},
{
Timestamp: time.Unix(0, 0),
Line: "foo",
},
},
}
for _, test := range []struct {
name string
iters []EntryIterator
@ -526,6 +543,14 @@ func Test_DuplicateCount(t *testing.T) {
logproto.BACKWARD,
3,
},
{
"stream with duplicates",
[]EntryIterator{
NewStreamIterator(dupeStream),
},
logproto.BACKWARD,
2,
},
{
"replication 2 f",
[]EntryIterator{
@ -898,3 +923,29 @@ func TestDedupeMergeEntryIterator(t *testing.T) {
require.Equal(t, "3", it.Entry().Line)
require.Equal(t, time.Unix(2, 0), it.Entry().Timestamp)
}
func TestSingleStreamDedupeMergeEntryIterator(t *testing.T) {
it := NewMergeEntryIterator(context.Background(),
[]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: ``,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "0",
},
{
Timestamp: time.Unix(1, 0),
Line: "0",
},
{
Timestamp: time.Unix(1, 0),
Line: "0",
},
},
}),
}, logproto.FORWARD)
require.True(t, it.Next())
require.Equal(t, "0", it.Entry().Line)
require.False(t, it.Next())
}

@ -228,17 +228,6 @@ func (i *mergeSampleIterator) Next() bool {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.curr.Sample = i.heap.Peek().Sample()
i.curr.labels = i.heap.Peek().Labels()
i.curr.streamHash = i.heap.Peek().StreamHash()
if !i.heap.Peek().Next() {
i.heap.Pop()
}
return true
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
@ -251,9 +240,8 @@ Outer:
break
}
heap.Pop(i.heap)
previous := i.buffer
var dupe bool
for _, t := range previous {
for _, t := range i.buffer {
if t.Sample.Hash == sample.Hash {
i.stats.AddDuplicates(1)
dupe = true
@ -277,7 +265,7 @@ Outer:
sample.Timestamp != i.buffer[0].Timestamp {
break
}
for _, t := range previous {
for _, t := range i.buffer {
if t.Hash == sample.Hash {
i.stats.AddDuplicates(1)
continue inner

@ -501,3 +501,35 @@ func TestDedupeMergeSampleIterator(t *testing.T) {
require.Equal(t, 1., it.Sample().Value)
require.Equal(t, xxhash.Sum64String("3"), it.Sample().Hash)
}
func TestSingleStreamDedupeMergeSampleIterator(t *testing.T) {
it := NewMergeSampleIterator(context.Background(),
[]SampleIterator{
NewSeriesIterator(logproto.Series{
Labels: ``,
Samples: []logproto.Sample{
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("1"),
},
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("1"),
},
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("1"),
},
},
StreamHash: 0,
}),
})
require.True(t, it.Next())
require.Equal(t, time.Unix(1, 0).UnixNano(), it.Sample().Timestamp)
require.Equal(t, 1., it.Sample().Value)
require.False(t, it.Next())
}

@ -161,9 +161,7 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
iters = append(iters, storeIter)
}
if len(iters) == 1 {
return iters[0], nil
}
return iter.NewMergeEntryIterator(ctx, iters, params.Direction), nil
}

@ -456,6 +456,22 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}
}
func mockStreamWithDupes() logproto.Stream {
entries := make([]logproto.Entry, 0, 5)
for i := 0; i < 5; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: "line",
})
}
return logproto.Stream{
Entries: entries,
Labels: `{type="test"}`,
}
}
type querierMock struct {
util.ExtendedMock
}

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -1110,6 +1112,45 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {
require.Equal(t, "test", delGetter.user)
}
func TestQuerier_SelectLogWithDupesInStream(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).Return(iter.NewStreamIterator(mockStreamWithDupes()), nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
cfg := mockQuerierConfig()
cfg.QueryStoreOnly = true
q, err := newQuerier(
cfg,
mockIngesterClientConfig(),
newIngesterClientMockFactory(nil),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store,
limits,
)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
request := logproto.QueryRequest{
Selector: `{type="test"}`,
Limit: 10,
Start: time.Unix(0, 300000000),
End: time.Unix(0, 600000000),
Direction: logproto.FORWARD,
}
itr, err := q.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: &request})
require.NoError(t, err)
require.True(t, itr.Next())
require.Equal(t, itr.Entry().Line, "line")
require.False(t, itr.Next())
}
func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {
queryClient := newQuerySampleClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 2)}), nil)

@ -66,7 +66,7 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk {
}
for i := 0; i < num; i++ {
chk := newChunk(buildTestStreams(fooLabelsWithName, timeRange{
chk := newChunk(buildTestStreams(fooLabelsWithNameAndShard, timeRange{
from: now.Add(time.Duration(i) * time.Minute),
to: now.Add(time.Duration(i+1) * time.Minute),
}))

@ -5,6 +5,8 @@ import (
"sort"
"time"
"github.com/grafana/loki/pkg/distributor"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -412,7 +414,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
result := make([]iter.EntryIterator, 0, len(chks))
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
streamPipeline := it.pipeline.ForStream(filteredLabels(chunks[0][0].Chunk.Metric))
iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk)
if err != nil {
return nil, err
@ -556,7 +558,7 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La
result := make([]iter.SampleIterator, 0, len(chks))
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamExtractor := it.extractor.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
streamExtractor := it.extractor.ForStream(filteredLabels(chunks[0][0].Chunk.Metric))
iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk)
if err != nil {
return nil, err
@ -568,6 +570,13 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La
return result, nil
}
func filteredLabels(ls labels.Labels) labels.Labels {
return labels.NewBuilder(ls).
Del(labels.MetricName).
Del(distributor.ShardLbName).
Labels()
}
func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) {
result := make([]iter.SampleIterator, 0, len(chks))

@ -26,7 +26,7 @@ var NilMetrics = NewChunkMetrics(nil, 0)
func Test_batchIterSafeStart(t *testing.T) {
stream := logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -76,7 +76,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"forward with overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -89,7 +89,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -102,7 +102,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -115,7 +115,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -128,7 +128,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -141,7 +141,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -177,7 +177,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(4 * time.Millisecond),
logproto.FORWARD,
2,
@ -185,7 +185,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"forward all overlap and all chunks have a from time less than query from time": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -198,7 +198,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -215,7 +215,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -232,7 +232,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -249,7 +249,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -266,7 +266,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -306,7 +306,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from.Add(1 * time.Millisecond), from.Add(5 * time.Millisecond),
logproto.FORWARD,
2,
@ -314,7 +314,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"forward with overlapping non-continuous entries": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -331,7 +331,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -344,7 +344,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -357,7 +357,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -389,7 +389,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(3 * time.Millisecond),
logproto.FORWARD,
2,
@ -397,7 +397,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"backward with overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -410,7 +410,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -423,7 +423,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -436,7 +436,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -449,7 +449,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -462,7 +462,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -498,7 +498,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(4 * time.Millisecond),
logproto.BACKWARD,
2,
@ -506,7 +506,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"backward all overlap and all chunks have a through time greater than query through time": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -523,7 +523,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -540,7 +540,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -557,7 +557,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -574,7 +574,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -591,7 +591,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -627,7 +627,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(4 * time.Millisecond),
logproto.BACKWARD,
2,
@ -635,7 +635,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"backward with overlapping non-continuous entries": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(0 * time.Millisecond),
@ -648,7 +648,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(1 * time.Millisecond),
@ -661,7 +661,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -674,7 +674,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(4 * time.Millisecond),
@ -726,7 +726,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(8 * time.Millisecond),
logproto.BACKWARD,
2,
@ -734,7 +734,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"forward without overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -747,7 +747,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -756,7 +756,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -784,7 +784,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(3 * time.Millisecond),
logproto.FORWARD,
2,
@ -792,7 +792,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"backward without overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -805,7 +805,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -814,7 +814,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -842,7 +842,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(3 * time.Millisecond),
logproto.BACKWARD,
2,
@ -858,7 +858,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
"forward identicals": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -867,7 +867,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -876,7 +876,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -889,7 +889,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -898,7 +898,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -907,7 +907,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -916,7 +916,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -944,7 +944,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(4 * time.Millisecond),
logproto.FORWARD,
1,
@ -988,7 +988,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
"forward with overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -1001,7 +1001,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -1014,7 +1014,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -1027,7 +1027,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -1040,7 +1040,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -1053,7 +1053,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -1093,14 +1093,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(4 * time.Millisecond),
2,
},
"forward with overlapping non-continuous entries": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -1117,7 +1117,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -1130,7 +1130,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(time.Millisecond),
@ -1143,7 +1143,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -1178,14 +1178,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(3 * time.Millisecond),
2,
},
"forward last chunk boundaries equal to end": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
@ -1198,7 +1198,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(2, 0),
@ -1211,7 +1211,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3, 0),
@ -1241,14 +1241,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
time.Unix(1, 0), time.Unix(3, 0),
2,
},
"forward last chunk boundaries equal to end and start": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
@ -1261,7 +1261,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
@ -1291,14 +1291,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
time.Unix(1, 0), time.Unix(1, 0),
2,
},
"forward without overlap": {
[]*LazyChunk{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -1311,7 +1311,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -1320,7 +1320,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
}),
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
@ -1351,7 +1351,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
},
},
},
fooLabelsWithName.String(),
fooLabelsWithNameAndShard.String(),
from, from.Add(3 * time.Millisecond),
2,
},
@ -1389,7 +1389,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
func TestPartitionOverlappingchunks(t *testing.T) {
var (
oneThroughFour = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,
@ -1402,7 +1402,7 @@ func TestPartitionOverlappingchunks(t *testing.T) {
},
})
two = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(1 * time.Millisecond),
@ -1411,7 +1411,7 @@ func TestPartitionOverlappingchunks(t *testing.T) {
},
})
three = newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from.Add(2 * time.Millisecond),
@ -1634,7 +1634,7 @@ func Test_IsInvalidChunkError(t *testing.T) {
func TestBatchCancel(t *testing.T) {
createChunk := func(from time.Time) *LazyChunk {
return newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Labels: fooLabelsWithNameAndShard.String(),
Entries: []logproto.Entry{
{
Timestamp: from,

@ -24,8 +24,8 @@ func TestLazyChunkIterator(t *testing.T) {
}{
{
newLazyChunk(logproto.Stream{
Labels: fooLabelsWithName.String(),
Hash: fooLabelsWithName.Hash(),
Labels: fooLabelsWithNameAndShard.String(),
Hash: fooLabelsWithNameAndShard.Hash(),
Entries: []logproto.Entry{
{
Timestamp: from,

@ -1064,7 +1064,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
chk := newChunk(buildTestStreams(fooLabelsWithNameAndShard, tr))
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
@ -1081,7 +1081,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
defer store.Stop()
// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithNameAndShard.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {

@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/distributor"
"github.com/davecgh/go-spew/spew"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -29,8 +31,8 @@ import (
)
var (
fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}}
fooLabels = labels.Labels{{Name: "foo", Value: "bar"}}
fooLabelsWithNameAndShard = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}, {Name: distributor.ShardLbName, Value: "3"}}
fooLabels = labels.Labels{{Name: "foo", Value: "bar"}}
)
var from = time.Unix(0, time.Millisecond.Nanoseconds())

Loading…
Cancel
Save