Fixes deduping when multiple timestamp are equal (#5799)

* Fixes deduping when multiple timestamp are equal

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* update changelog

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedback

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes int64

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* that nit was not worth

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/5846/head
Cyril Tovena 3 years ago committed by GitHub
parent c0cc004333
commit fcb9812f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 55
      CHANGELOG.md
  2. 163
      pkg/ingester/ingester_test.go
  3. 4
      pkg/iter/cache.go
  4. 147
      pkg/iter/entry_iterator.go
  5. 53
      pkg/iter/entry_iterator_test.go
  6. 113
      pkg/iter/sample_iterator.go
  7. 52
      pkg/iter/sample_iterator_test.go

@ -1,3 +1,58 @@
## Main
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist.
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fixes deduping issues when multiple entries exists with the same timestamp.
* [5780](https://github.com/grafana/loki/pull/5780) **simonswine**: Update alpine image to 3.15.4.
* [5715](https://github.com/grafana/loki/pull/5715) **chaudum** Add option to push RFC5424 syslog messages from Promtail in syslog scrape target.
* [5696](https://github.com/grafana/loki/pull/5696) **paullryan** don't block scraping of new logs from cloudflare within promtail if an error is received from cloudflare about too early logs.
* [5685](https://github.com/grafana/loki/pull/5625) **chaudum** Fix bug in push request parser that allowed users to send arbitrary non-string data as "log line".
* [5707](https://github.com/grafana/loki/pull/5707) **franzwong** Promtail: Rename config name limit_config to limits_config.
* [5626](https://github.com/grafana/loki/pull/5626) **jeschkies** Support multi-tenant select logs and samples queries.
* [5622](https://github.com/grafana/loki/pull/5622) **chaudum**: Fix bug in query splitter that caused `interval` query parameter to be ignored and therefore returning more logs than expected.
* [5521](https://github.com/grafana/loki/pull/5521) **cstyan**: Move stream lag configuration to top level clients config struct and refactor stream lag metric, this resolves a bug with duplicate metric collection when a single Promtail binary is running multiple Promtail clients.
* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing`
* [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel`
* [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki.
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5481](https://github.com/grafana/loki/pull/5481) **MichelHollands**: Add a DeletionMode config variable to specify the delete mode and validate match parameters.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.
* [5361](https://github.com/grafana/loki/pull/5361) **ctovena**: Add usage report to grafana.com.
* [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels.
* [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk.
* [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats.
* [5266](https://github.com/grafana/loki/pull/5266) **jeschkies**: Write Promtail position file atomically on Unix.
* [5280](https://github.com/grafana/loki/pull/5280) **jeschkies**: Fix Docker target connection loss.
* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: moves `querier.split-queries-by-interval` to limits code only.
* [5139](https://github.com/grafana/loki/pull/5139) **DylanGuedes**: Drop support for legacy configuration rules format.
* [5262](https://github.com/grafana/loki/pull/5262) **MichelHollands**: Remove the labelFilter field
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
* [5148](https://github.com/grafana/loki/pull/5148) **chaudum** Add periodic task to prune old expired items from the FIFO cache to free up memory.
* [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`.
* [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager.
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines.
* [5144](https://github.com/grafana/loki/pull/5144) **dannykopping** Ruler: fix remote write basic auth credentials.
* [5091](https://github.com/grafana/loki/pull/5091) **owen-d**: Changes `ingester.concurrent-flushes` default to 32
* [5031](https://github.com/grafana/loki/pull/5031) **liguozhong**: Promtail: Add global read rate limiting.
* [4879](https://github.com/grafana/loki/pull/4879) **cyriltovena**: LogQL: add __line__ function to | line_format template.
* [5081](https://github.com/grafana/loki/pull/5081) **SasSwart**: Add the option to configure memory ballast for Loki
* [5085](https://github.com/grafana/loki/pull/5085) **aknuds1**: Upgrade Cortex to [e0807c4eb487](https://github.com/cortexproject/cortex/compare/4e9fc3a2b5ab..e0807c4eb487) and Prometheus to [692a54649ed7](https://github.com/prometheus/prometheus/compare/2a3d62ac8456..692a54649ed7)
* [5067](https://github.com/grafana/loki/pull/5057) **cstyan**: Add a metric to Azure Blob Storage client to track total egress bytes
* [5065](https://github.com/grafana/loki/pull/5065) **AndreZiviani**: lambda-promtail: Add ability to ingest logs from S3
* [4950](https://github.com/grafana/loki/pull/4950) **DylanGuedes**: Implement common instance addr/net interface
* [4949](https://github.com/grafana/loki/pull/4949) **ssncferreira**: Add query `queueTime` metric to statistics and metrics.go
* [4938](https://github.com/grafana/loki/pull/4938) **DylanGuedes**: Implement ring status page for the distributor
* [5023](https://github.com/grafana/loki/pull/5023) **ssncferreira**: Move `querier.split-queries-by-interval` to a per-tenant configuration
* [4993](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix parent of wal and wal_cleaner in loki ruler config docs
* [4933](https://github.com/grafana/loki/pull/4933) **jeschkies**: Support matchers in series label values query.
* [4926](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix comment in Loki module loading for accuracy
* [4920](https://github.com/grafana/loki/pull/4920) **chaudum**: Add `-list-targets` command line flag to list all available run targets
* [4860](https://github.com/grafana/loki/pull/4860) **cyriltovena**: Add rate limiting and metrics to hedging
* [4865](https://github.com/grafana/loki/pull/4865) **taisho6339**: Fix duplicate registry.MustRegister call in Promtail Kafka
* [4845](https://github.com/grafana/loki/pull/4845) **chaudum** Return error responses consistently as JSON
## Unreleased
### All Changes

@ -747,6 +747,140 @@ func Test_DedupeIngester(t *testing.T) {
})
}
func Test_DedupeIngesterParser(t *testing.T) {
var (
requests = 100
streamCount = 10
streams []labels.Labels
ingesterCount = 30
ingesterConfig = defaultIngesterTestConfig(t)
ctx, _ = user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), "foo"))
)
// make sure we will cut blocks and chunks and use head chunks
ingesterConfig.TargetChunkSize = 800
ingesterConfig.BlockSize = 300
// created many different ingesters
ingesterSet, closer := createIngesterSets(t, ingesterConfig, ingesterCount)
defer closer()
for i := 0; i < streamCount; i++ {
streams = append(streams, labels.FromStrings("foo", "bar", "bar", fmt.Sprintf("baz%d", i)))
}
for i := 0; i < requests; i++ {
for _, ing := range ingesterSet {
_, err := ing.Push(ctx, buildPushJSONRequest(int64(i), streams))
require.NoError(t, err)
}
}
t.Run("backward log", func(t *testing.T) {
iterators := make([]iter.EntryIterator, 0, len(ingesterSet))
for _, client := range ingesterSet {
stream, err := client.Query(ctx, &logproto.QueryRequest{
Selector: `{foo="bar"} | json`,
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
Limit: uint32(requests * streamCount * 2),
Direction: logproto.BACKWARD,
})
require.NoError(t, err)
iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.BACKWARD))
}
it := iter.NewMergeEntryIterator(ctx, iterators, logproto.BACKWARD)
for i := requests - 1; i >= 0; i-- {
for j := 0; j < streamCount; j++ {
for k := 0; k < 2; k++ { // 2 line per entry
require.True(t, it.Next())
require.Equal(t, int64(i), it.Entry().Timestamp.UnixNano())
}
}
}
require.False(t, it.Next())
require.NoError(t, it.Error())
})
t.Run("forward log", func(t *testing.T) {
iterators := make([]iter.EntryIterator, 0, len(ingesterSet))
for _, client := range ingesterSet {
stream, err := client.Query(ctx, &logproto.QueryRequest{
Selector: `{foo="bar"} | json`, // making it difficult to dedupe by removing uncommon label.
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
Limit: uint32(requests * streamCount * 2),
Direction: logproto.FORWARD,
})
require.NoError(t, err)
iterators = append(iterators, iter.NewQueryClientIterator(stream, logproto.FORWARD))
}
it := iter.NewMergeEntryIterator(ctx, iterators, logproto.FORWARD)
for i := 0; i < requests; i++ {
for j := 0; j < streamCount; j++ {
for k := 0; k < 2; k++ { // 2 line per entry
require.True(t, it.Next())
require.Equal(t, int64(i), it.Entry().Timestamp.UnixNano())
}
}
}
require.False(t, it.Next())
require.NoError(t, it.Error())
})
t.Run("no sum metrics", func(t *testing.T) {
iterators := make([]iter.SampleIterator, 0, len(ingesterSet))
for _, client := range ingesterSet {
stream, err := client.QuerySample(ctx, &logproto.SampleQueryRequest{
Selector: `rate({foo="bar"} | json [1m])`,
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
}
it := iter.NewMergeSampleIterator(ctx, iterators)
for i := 0; i < requests; i++ {
for j := 0; j < streamCount; j++ {
for k := 0; k < 2; k++ { // 2 line per entry
require.True(t, it.Next())
require.Equal(t, float64(1), it.Sample().Value)
require.Equal(t, int64(i), it.Sample().Timestamp)
}
}
}
require.False(t, it.Next())
require.NoError(t, it.Error())
})
t.Run("sum metrics", func(t *testing.T) {
iterators := make([]iter.SampleIterator, 0, len(ingesterSet))
for _, client := range ingesterSet {
stream, err := client.QuerySample(ctx, &logproto.SampleQueryRequest{
Selector: `sum by (c,d,e,foo) (rate({foo="bar"} | json [1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, int64(requests+1)),
})
require.NoError(t, err)
iterators = append(iterators, iter.NewSampleQueryClientIterator(stream))
}
it := iter.NewMergeSampleIterator(ctx, iterators)
for i := 0; i < requests; i++ {
for j := 0; j < streamCount; j++ {
for k := 0; k < 2; k++ { // 2 line per entry
require.True(t, it.Next())
require.Equal(t, float64(1), it.Sample().Value)
require.Equal(t, int64(i), it.Sample().Timestamp)
}
}
}
require.False(t, it.Next())
require.NoError(t, it.Error())
})
}
type ingesterClient struct {
logproto.PusherClient
logproto.QuerierClient
@ -822,3 +956,32 @@ func buildPushRequest(ts int64, streams []labels.Labels) *logproto.PushRequest {
return req
}
func buildPushJSONRequest(ts int64, streams []labels.Labels) *logproto.PushRequest {
req := &logproto.PushRequest{}
for _, stream := range streams {
req.Streams = append(req.Streams, logproto.Stream{
Labels: stream.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, ts),
Line: jsonLine(ts, 0),
},
{
Timestamp: time.Unix(0, ts),
Line: jsonLine(ts, 1),
},
},
})
}
return req
}
func jsonLine(ts int64, i int) string {
if i%2 == 0 {
return fmt.Sprintf(`{"a":"b", "c":"d", "e":"f", "g":"h", "ts":"%d"}`, ts)
}
return fmt.Sprintf(`{"e":"f", "h":"i", "j":"k", "g":"h", "ts":"%d"}`, ts)
}

@ -53,7 +53,7 @@ func (it *cachedIterator) consumeWrapped() bool {
return false
}
// we're caching entries
it.cache = append(it.cache, entryWithLabels{entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels(), streamHash: it.Wrapped().StreamHash()})
it.cache = append(it.cache, entryWithLabels{Entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels(), streamHash: it.Wrapped().StreamHash()})
it.curr++
return true
}
@ -77,7 +77,7 @@ func (it *cachedIterator) Entry() logproto.Entry {
return logproto.Entry{}
}
return it.cache[it.curr].entry
return it.cache[it.curr].Entry
}
func (it *cachedIterator) Labels() string {

@ -104,11 +104,16 @@ type mergeEntryIterator struct {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
is []EntryIterator
// pushBuffer contains the list of iterators that needs to be pushed to the heap
// This is to avoid allocations.
pushBuffer []EntryIterator
prefetched bool
stats *stats.Context
tuples []tuple
// buffer of entries to be returned by Next()
// We buffer entries with the same timestamp to correctly dedupe them.
buffer []entryWithLabels
currEntry entryWithLabels
errs []error
}
@ -128,7 +133,8 @@ func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction lo
panic("bad direction")
}
result.tuples = make([]tuple, 0, len(is))
result.buffer = make([]entryWithLabels, 0, len(is))
result.pushBuffer = make([]EntryIterator, 0, len(is))
return result
}
@ -172,21 +178,24 @@ func (i *mergeEntryIterator) Push(ei EntryIterator) {
i.requeue(ei, false)
}
type tuple struct {
logproto.Entry
EntryIterator
}
// Next pop iterators from the heap until it finds an entry with a different timestamp or stream hash.
// For each iterators we also buffer entries with the current timestamp and stream hash deduping as we loop.
// If the iterator is not fully exhausted, it is pushed back to the heap.
func (i *mergeEntryIterator) Next() bool {
i.prefetch()
if len(i.buffer) != 0 {
i.nextFromBuffer()
return true
}
if i.heap.Len() == 0 {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.currEntry.entry = i.heap.Peek().Entry()
i.currEntry.Entry = i.heap.Peek().Entry()
i.currEntry.labels = i.heap.Peek().Labels()
i.currEntry.streamHash = i.heap.Peek().StreamHash()
@ -200,55 +209,81 @@ func (i *mergeEntryIterator) Next() bool {
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
Outer:
for i.heap.Len() > 0 {
next := i.heap.Peek()
entry := next.Entry()
if len(i.tuples) > 0 && (i.tuples[0].StreamHash() != next.StreamHash() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) {
if len(i.buffer) > 0 &&
(i.buffer[0].streamHash != next.StreamHash() ||
!i.buffer[0].Entry.Timestamp.Equal(entry.Timestamp)) {
break
}
heap.Pop(i.heap)
i.tuples = append(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
}
// shortcut if we have a single tuple.
if len(i.tuples) == 1 {
i.currEntry.entry = i.tuples[0].Entry
i.currEntry.labels = i.tuples[0].Labels()
i.currEntry.streamHash = i.tuples[0].StreamHash()
i.requeue(i.tuples[0].EntryIterator, false)
i.tuples = i.tuples[:0]
return true
}
// Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := i.tuples[0]
i.currEntry.entry = t.Entry
i.currEntry.labels = t.Labels()
i.currEntry.streamHash = i.tuples[0].StreamHash()
// Requeue the iterators, advancing them if they were consumed.
for j := range i.tuples {
if i.tuples[j].Line != i.currEntry.entry.Line {
i.requeue(i.tuples[j].EntryIterator, true)
continue
previous := i.buffer
var dupe bool
for _, t := range previous {
if t.Entry.Line == entry.Line {
i.stats.AddDuplicates(1)
dupe = true
break
}
}
// we count as duplicates only if the tuple is not the one (t) used to fill the current entry
if i.tuples[j] != t {
i.stats.AddDuplicates(1)
if !dupe {
i.buffer = append(i.buffer, entryWithLabels{
Entry: entry,
labels: next.Labels(),
streamHash: next.StreamHash(),
})
}
i.requeue(i.tuples[j].EntryIterator, false)
inner:
for {
if !next.Next() {
continue Outer
}
entry := next.Entry()
if next.StreamHash() != i.buffer[0].streamHash ||
!entry.Timestamp.Equal(i.buffer[0].Entry.Timestamp) {
break
}
for _, t := range previous {
if t.Entry.Line == entry.Line {
i.stats.AddDuplicates(1)
continue inner
}
}
i.buffer = append(i.buffer, entryWithLabels{
Entry: entry,
labels: next.Labels(),
streamHash: next.StreamHash(),
})
}
i.pushBuffer = append(i.pushBuffer, next)
}
i.tuples = i.tuples[:0]
for _, ei := range i.pushBuffer {
heap.Push(i.heap, ei)
}
i.pushBuffer = i.pushBuffer[:0]
i.nextFromBuffer()
return true
}
func (i *mergeEntryIterator) nextFromBuffer() {
i.currEntry.Entry = i.buffer[0].Entry
i.currEntry.labels = i.buffer[0].labels
i.currEntry.streamHash = i.buffer[0].streamHash
if len(i.buffer) == 1 {
i.buffer = i.buffer[:0]
return
}
i.buffer = i.buffer[1:]
}
func (i *mergeEntryIterator) Entry() logproto.Entry {
return i.currEntry.entry
return i.currEntry.Entry
}
func (i *mergeEntryIterator) Labels() string {
@ -274,7 +309,7 @@ func (i *mergeEntryIterator) Close() error {
return err
}
}
i.tuples = nil
i.buffer = nil
return nil
}
@ -407,7 +442,7 @@ func (i *entrySortIterator) Next() bool {
}
next := i.is[0]
i.currEntry.entry = next.Entry()
i.currEntry.Entry = next.Entry()
i.currEntry.labels = next.Labels()
i.currEntry.streamHash = next.StreamHash()
// if the top iterator is empty, we remove it.
@ -428,7 +463,7 @@ func (i *entrySortIterator) Next() bool {
}
func (i *entrySortIterator) Entry() logproto.Entry {
return i.currEntry.entry
return i.currEntry.Entry
}
func (i *entrySortIterator) Labels() string {
@ -632,7 +667,7 @@ func (i *timeRangedIterator) Next() bool {
}
type entryWithLabels struct {
entry logproto.Entry
logproto.Entry
labels string
streamHash uint64
}
@ -687,7 +722,7 @@ func (i *reverseIterator) Next() bool {
}
func (i *reverseIterator) Entry() logproto.Entry {
return i.cur.entry
return i.cur.Entry
}
func (i *reverseIterator) Labels() string {
@ -762,7 +797,7 @@ func (i *reverseEntryIterator) Next() bool {
}
func (i *reverseEntryIterator) Entry() logproto.Entry {
return i.cur.entry
return i.cur.Entry
}
func (i *reverseEntryIterator) Labels() string {
@ -854,11 +889,11 @@ func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator {
next := &entryWithLabels{}
if iter.Next() {
cache = &entryWithLabels{
entry: iter.Entry(),
Entry: iter.Entry(),
labels: iter.Labels(),
streamHash: iter.StreamHash(),
}
next.entry = cache.entry
next.Entry = cache.Entry
next.labels = cache.labels
}
return &peekingEntryIterator{
@ -871,7 +906,7 @@ func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator {
// Next implements `EntryIterator`
func (it *peekingEntryIterator) Next() bool {
if it.cache != nil {
it.next.entry = it.cache.entry
it.next.Entry = it.cache.Entry
it.next.labels = it.cache.labels
it.next.streamHash = it.cache.streamHash
it.cacheNext()
@ -883,7 +918,7 @@ func (it *peekingEntryIterator) Next() bool {
// cacheNext caches the next element if it exists.
func (it *peekingEntryIterator) cacheNext() {
if it.iter.Next() {
it.cache.entry = it.iter.Entry()
it.cache.Entry = it.iter.Entry()
it.cache.labels = it.iter.Labels()
it.cache.streamHash = it.iter.StreamHash()
return
@ -895,7 +930,7 @@ func (it *peekingEntryIterator) cacheNext() {
// Peek implements `PeekingEntryIterator`
func (it *peekingEntryIterator) Peek() (string, logproto.Entry, bool) {
if it.cache != nil {
return it.cache.labels, it.cache.entry, true
return it.cache.labels, it.cache.Entry, true
}
return "", logproto.Entry{}, false
}
@ -918,7 +953,7 @@ func (it *peekingEntryIterator) StreamHash() uint64 {
// Entry implements `EntryIterator`
func (it *peekingEntryIterator) Entry() logproto.Entry {
if it.next != nil {
return it.next.entry
return it.next.Entry
}
return logproto.Entry{}
}

@ -718,6 +718,7 @@ func BenchmarkSortIterator(b *testing.B) {
})
b.Run("merge sort", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
@ -845,3 +846,55 @@ func Test_EntrySortIterator(t *testing.T) {
}
})
}
func TestDedupeMergeEntryIterator(t *testing.T) {
it := NewMergeEntryIterator(context.Background(),
[]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: ``,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "0",
},
{
Timestamp: time.Unix(1, 0),
Line: "2",
},
{
Timestamp: time.Unix(2, 0),
Line: "3",
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: ``,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "1",
},
{
Timestamp: time.Unix(1, 0),
Line: "0",
},
{
Timestamp: time.Unix(1, 0),
Line: "2",
},
},
}),
}, logproto.FORWARD)
require.True(t, it.Next())
require.Equal(t, "0", it.Entry().Line)
require.Equal(t, time.Unix(1, 0), it.Entry().Timestamp)
require.True(t, it.Next())
require.Equal(t, "2", it.Entry().Line)
require.Equal(t, time.Unix(1, 0), it.Entry().Timestamp)
require.True(t, it.Next())
require.Equal(t, "1", it.Entry().Line)
require.Equal(t, time.Unix(1, 0), it.Entry().Timestamp)
require.True(t, it.Next())
require.Equal(t, "3", it.Entry().Line)
require.Equal(t, time.Unix(2, 0), it.Entry().Timestamp)
}

@ -152,8 +152,13 @@ type mergeSampleIterator struct {
is []SampleIterator
prefetched bool
stats *stats.Context
// pushBuffer contains the list of iterators that needs to be pushed to the heap
// This is to avoid allocations.
pushBuffer []SampleIterator
tuples []sampletuple
// buffer of entries to be returned by Next()
// We buffer entries with the same timestamp to correctly dedupe them.
buffer []sampleWithLabels
curr sampleWithLabels
errs []error
}
@ -167,10 +172,11 @@ func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIter
its: make([]SampleIterator, 0, len(is)),
}
return &mergeSampleIterator{
stats: stats.FromContext(ctx),
is: is,
heap: &h,
tuples: make([]sampletuple, 0, len(is)),
stats: stats.FromContext(ctx),
is: is,
heap: &h,
buffer: make([]sampleWithLabels, 0, len(is)),
pushBuffer: make([]SampleIterator, 0, len(is)),
}
}
@ -210,14 +216,14 @@ func (i *mergeSampleIterator) requeue(ei SampleIterator, advanced bool) {
util.LogError("closing iterator", ei.Close)
}
type sampletuple struct {
logproto.Sample
SampleIterator
}
func (i *mergeSampleIterator) Next() bool {
i.prefetch()
if len(i.buffer) != 0 {
i.nextFromBuffer()
return true
}
if i.heap.Len() == 0 {
return false
}
@ -237,45 +243,76 @@ func (i *mergeSampleIterator) Next() bool {
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
Outer:
for i.heap.Len() > 0 {
next := i.heap.Peek()
sample := next.Sample()
if len(i.tuples) > 0 && (i.tuples[0].StreamHash() != next.StreamHash() || i.tuples[0].Timestamp != sample.Timestamp) {
if len(i.buffer) > 0 && (i.buffer[0].streamHash != next.StreamHash() || i.buffer[0].Timestamp != sample.Timestamp) {
break
}
heap.Pop(i.heap)
i.tuples = append(i.tuples, sampletuple{
Sample: sample,
SampleIterator: next,
})
}
i.curr.Sample = i.tuples[0].Sample
i.curr.labels = i.tuples[0].Labels()
i.curr.streamHash = i.tuples[0].StreamHash()
t := i.tuples[0]
if len(i.tuples) == 1 {
i.requeue(i.tuples[0].SampleIterator, false)
i.tuples = i.tuples[:0]
return true
}
// Requeue the iterators, advancing them if they were consumed.
for j := range i.tuples {
if i.tuples[j].Hash != i.curr.Hash {
i.requeue(i.tuples[j].SampleIterator, true)
continue
previous := i.buffer
var dupe bool
for _, t := range previous {
if t.Sample.Hash == sample.Hash {
i.stats.AddDuplicates(1)
dupe = true
break
}
}
// we count as duplicates only if the tuple is not the one (t) used to fill the current entry
if i.tuples[j] != t {
i.stats.AddDuplicates(1)
if !dupe {
i.buffer = append(i.buffer, sampleWithLabels{
Sample: sample,
labels: next.Labels(),
streamHash: next.StreamHash(),
})
}
i.requeue(i.tuples[j].SampleIterator, false)
inner:
for {
if !next.Next() {
continue Outer
}
sample := next.Sample()
if next.StreamHash() != i.buffer[0].streamHash ||
sample.Timestamp != i.buffer[0].Timestamp {
break
}
for _, t := range previous {
if t.Hash == sample.Hash {
i.stats.AddDuplicates(1)
continue inner
}
}
i.buffer = append(i.buffer, sampleWithLabels{
Sample: sample,
labels: next.Labels(),
streamHash: next.StreamHash(),
})
}
i.pushBuffer = append(i.pushBuffer, next)
}
for _, ei := range i.pushBuffer {
heap.Push(i.heap, ei)
}
i.tuples = i.tuples[:0]
i.pushBuffer = i.pushBuffer[:0]
i.nextFromBuffer()
return true
}
func (i *mergeSampleIterator) nextFromBuffer() {
i.curr.Sample = i.buffer[0].Sample
i.curr.labels = i.buffer[0].labels
i.curr.streamHash = i.buffer[0].streamHash
if len(i.buffer) == 1 {
i.buffer = i.buffer[:0]
return
}
i.buffer = i.buffer[1:]
}
func (i *mergeSampleIterator) Sample() logproto.Sample {
return i.curr.Sample
}
@ -305,7 +342,7 @@ func (i *mergeSampleIterator) Close() error {
return err
}
}
i.tuples = nil
i.buffer = nil
return nil
}

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/cespare/xxhash"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -449,3 +450,54 @@ func Test_SampleSortIterator(t *testing.T) {
}
})
}
func TestDedupeMergeSampleIterator(t *testing.T) {
it := NewMergeSampleIterator(context.Background(),
[]SampleIterator{
NewSeriesIterator(logproto.Series{
Labels: ``,
Samples: []logproto.Sample{
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("1"),
},
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("2"),
},
},
StreamHash: 0,
}),
NewSeriesIterator(logproto.Series{
Labels: ``,
Samples: []logproto.Sample{
{
Timestamp: time.Unix(1, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("2"),
},
{
Timestamp: time.Unix(2, 0).UnixNano(),
Value: 1.,
Hash: xxhash.Sum64String("3"),
},
},
StreamHash: 0,
}),
})
require.True(t, it.Next())
require.Equal(t, time.Unix(1, 0).UnixNano(), it.Sample().Timestamp)
require.Equal(t, 1., it.Sample().Value)
require.Equal(t, xxhash.Sum64String("1"), it.Sample().Hash)
require.True(t, it.Next())
require.Equal(t, time.Unix(1, 0).UnixNano(), it.Sample().Timestamp)
require.Equal(t, 1., it.Sample().Value)
require.Equal(t, xxhash.Sum64String("2"), it.Sample().Hash)
require.True(t, it.Next())
require.Equal(t, time.Unix(2, 0).UnixNano(), it.Sample().Timestamp)
require.Equal(t, 1., it.Sample().Value)
require.Equal(t, xxhash.Sum64String("3"), it.Sample().Hash)
}

Loading…
Cancel
Save