From d7345c1c4c81dc483f33dd423e7ff57adb2cd67a Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Mon, 18 Jul 2022 15:05:05 -0400 Subject: [PATCH] Loki: Do not store exact log line duplicates (now including unordered inserts) (#6642) * do not insert log lines which are an exact duplicate in both timestamp and log content Signed-off-by: Edward Welch * update test to now ignore dupes pushed out of order Signed-off-by: Edward Welch * make the test unordered to be more thorough Signed-off-by: Edward Welch --- pkg/chunkenc/unordered.go | 9 +++++++++ pkg/chunkenc/unordered_test.go | 20 ++++++++++++++++++++ pkg/ingester/stream_test.go | 4 +--- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index ed7dc608b4..913cdb6898 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -116,6 +116,15 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error { } displaced := hb.rt.Add(e) if displaced[0] != nil { + // While we support multiple entries at the same timestamp, we _do_ de-duplicate + // entries at the same time with the same content, iterate through any existing + // entries and ignore the line if we already have an entry with the same content + for _, et := range displaced[0].(*nsEntries).entries { + if et == line { + e.entries = displaced[0].(*nsEntries).entries + return nil + } + } e.entries = append(displaced[0].(*nsEntries).entries, line) } else { e.entries = []string{line} diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 34b866ea13..d36e02e72a 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -141,6 +141,26 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { }, dir: logproto.BACKWARD, }, + { + desc: "ts remove exact dupe forward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, {0, "b"}, + }, + exp: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + dir: logproto.FORWARD, + }, + { + desc: "ts remove exact dupe backward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, {0, "b"}, + }, + exp: []entry{ + {1, "c"}, {0, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, } { t.Run(tc.desc, func(t *testing.T) { hb := newUnorderedHeadBlock() diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 19a9499646..99016454fd 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -242,7 +242,7 @@ func TestUnorderedPush(t *testing.T) { entries: []logproto.Entry{ {Timestamp: time.Unix(2, 0), Line: "x"}, {Timestamp: time.Unix(1, 0), Line: "x"}, - {Timestamp: time.Unix(2, 0), Line: "x"}, + {Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored {Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored {Timestamp: time.Unix(10, 0), Line: "x"}, }, @@ -286,8 +286,6 @@ func TestUnorderedPush(t *testing.T) { exp := []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "x"}, {Timestamp: time.Unix(2, 0), Line: "x"}, - // duplicate was allowed here b/c it wasnt written sequentially - {Timestamp: time.Unix(2, 0), Line: "x"}, {Timestamp: time.Unix(7, 0), Line: "x"}, {Timestamp: time.Unix(8, 0), Line: "x"}, {Timestamp: time.Unix(9, 0), Line: "x"},