Loki: Do not store exact log line duplicates (now including unordered inserts) (#6642)

* do not insert log lines which are an exact duplicate in both timestamp and log content

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* update test to now ignore dupes pushed out of order

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* make the test unordered to be more thorough

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/6710/head
Ed Welch 3 years ago committed by GitHub
parent 1a75bb8e22
commit d7345c1c4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      pkg/chunkenc/unordered.go
  2. 20
      pkg/chunkenc/unordered_test.go
  3. 4
      pkg/ingester/stream_test.go

@ -116,6 +116,15 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
}
displaced := hb.rt.Add(e)
if displaced[0] != nil {
// While we support multiple entries at the same timestamp, we _do_ de-duplicate
// entries at the same time with the same content, iterate through any existing
// entries and ignore the line if we already have an entry with the same content
for _, et := range displaced[0].(*nsEntries).entries {
if et == line {
e.entries = displaced[0].(*nsEntries).entries
return nil
}
}
e.entries = append(displaced[0].(*nsEntries).entries, line)
} else {
e.entries = []string{line}

@ -141,6 +141,26 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
},
dir: logproto.BACKWARD,
},
{
desc: "ts remove exact dupe forward",
input: []entry{
{0, "a"}, {0, "b"}, {1, "c"}, {0, "b"},
},
exp: []entry{
{0, "a"}, {0, "b"}, {1, "c"},
},
dir: logproto.FORWARD,
},
{
desc: "ts remove exact dupe backward",
input: []entry{
{0, "a"}, {0, "b"}, {1, "c"}, {0, "b"},
},
exp: []entry{
{1, "c"}, {0, "b"}, {0, "a"},
},
dir: logproto.BACKWARD,
},
} {
t.Run(tc.desc, func(t *testing.T) {
hb := newUnorderedHeadBlock()

@ -242,7 +242,7 @@ func TestUnorderedPush(t *testing.T) {
entries: []logproto.Entry{
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(1, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored
{Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored
{Timestamp: time.Unix(10, 0), Line: "x"},
},
@ -286,8 +286,6 @@ func TestUnorderedPush(t *testing.T) {
exp := []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"},
// duplicate was allowed here b/c it wasnt written sequentially
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(7, 0), Line: "x"},
{Timestamp: time.Unix(8, 0), Line: "x"},
{Timestamp: time.Unix(9, 0), Line: "x"},

Loading…
Cancel
Save