pkg/promtail/client: Handle fingerprint hash collisions (#1254)

* Handle fingerprint hash collisions

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Instead of using fingerprints, use label strings as map keys.

Label strings are properly sorted key=value pairs. This solution
produces more garbage, but doesn't reinvent its own hashing, and keeps
code simple.
This seems to be a good tradeoff for promtail.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
pull/1326/head
Peter Štibraný 6 years ago committed by Cyril Tovena
parent c4234e0bba
commit 125cfbfd5b
  1. 8
      pkg/logentry/stages/timestamp.go
  2. 25
      pkg/logentry/stages/timestamp_test.go
  3. 32
      pkg/promtail/client/batch.go
  4. 33
      pkg/promtail/client/batch_test.go

@ -145,7 +145,7 @@ func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]in
// The timestamp has been correctly parsed, so we should store it in the map
// containing the last known timestamp used by the "fudge" action on failure.
if *ts.cfg.ActionOnFailure == TimestampActionOnFailureFudge {
ts.lastKnownTimestamps.Add(labels.FastFingerprint(), *t)
ts.lastKnownTimestamps.Add(labels.String(), *t)
}
}
@ -193,8 +193,8 @@ func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t *time.
}
func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *time.Time) {
labelsFingerprint := labels.FastFingerprint()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsFingerprint)
labelsStr := labels.String()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsStr)
// If the last known timestamp is unknown (ie. has not been successfully parsed yet)
// there's nothing we can do, so we're going to keep the current timestamp
@ -206,5 +206,5 @@ func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *
*t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond)
// Store the fudged timestamp, so that a subsequent fudged timestamp will be 1ns after it
ts.lastKnownTimestamps.Add(labelsFingerprint, *t)
ts.lastKnownTimestamps.Add(labelsStr, *t)
}

@ -340,6 +340,29 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) {
time.Unix(1, 0),
},
},
"labels with colliding fingerprints should have independent timestamps when fudging": {
config: TimestampConfig{
Source: "time",
Format: time.RFC3339Nano,
ActionOnFailure: lokiutil.StringRef(TimestampActionOnFailureFudge),
},
inputEntries: []inputEntry{
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.400000000Z"}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.800000000Z"}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{}},
},
expectedTimestamps: []time.Time{
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000000Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000000Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000001Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000001Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000002Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000002Z"),
},
},
}
for testName, testData := range tests {
@ -360,7 +383,7 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) {
entry := ""
s.Process(inputEntry.labels, extracted, &timestamp, &entry)
assert.Equal(t, testData.expectedTimestamps[i], timestamp)
assert.Equal(t, testData.expectedTimestamps[i], timestamp, "entry: %d", i)
}
})
}

@ -6,7 +6,6 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
)
// batch holds pending log streams waiting to be sent to Loki, and it's used
@ -14,14 +13,14 @@ import (
// and entries in a single batch request. In case of multi-tenant Promtail, log
// streams for each tenant are stored in a dedicated batch.
type batch struct {
streams map[model.Fingerprint]*logproto.Stream
streams map[string]*logproto.Stream
bytes int
createdAt time.Time
}
func newBatch(entries ...entry) *batch {
b := &batch{
streams: map[model.Fingerprint]*logproto.Stream{},
streams: map[string]*logproto.Stream{},
bytes: 0,
createdAt: time.Now(),
}
@ -39,15 +38,15 @@ func (b *batch) add(entry entry) {
b.bytes += len(entry.Line)
// Append the entry to an already existing stream (if any)
fp := entry.labels.FastFingerprint()
if stream, ok := b.streams[fp]; ok {
labels := entry.labels.String()
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry.Entry)
return
}
// Add the entry as a new stream
b.streams[fp] = &logproto.Stream{
Labels: entry.labels.String(),
b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{entry.Entry},
}
}
@ -71,6 +70,17 @@ func (b *batch) age() time.Duration {
// encode the batch as snappy-compressed push request, and returns
// the encoded bytes and the number of encoded entries
func (b *batch) encode() ([]byte, int, error) {
req, entriesCount := b.createPushRequest()
buf, err := proto.Marshal(req)
if err != nil {
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, entriesCount, nil
}
// creates push request and returns it, together with number of entries
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(b.streams)),
}
@ -80,11 +90,5 @@ func (b *batch) encode() ([]byte, int, error) {
req.Streams = append(req.Streams, stream)
entriesCount += len(stream.Entries)
}
buf, err := proto.Marshal(&req)
if err != nil {
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, entriesCount, nil
return &req, entriesCount
}

@ -1,8 +1,11 @@
package client
import (
"fmt"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -103,3 +106,33 @@ func TestBatch_encode(t *testing.T) {
})
}
}
func TestHashCollisions(t *testing.T) {
b := newBatch()
ls1 := model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}
ls2 := model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}
require.False(t, ls1.Equal(ls2))
require.Equal(t, ls1.FastFingerprint(), ls2.FastFingerprint())
const entriesPerLabel = 10
for i := 0; i < entriesPerLabel; i++ {
b.add(entry{labels: ls1, Entry: logproto.Entry{time.Now(), fmt.Sprintf("line %d", i)}})
b.add(entry{labels: ls2, Entry: logproto.Entry{time.Now(), fmt.Sprintf("line %d", i)}})
}
// make sure that colliding labels are stored properly as independent streams
req, entries := b.createPushRequest()
assert.Len(t, req.Streams, 2)
assert.Equal(t, 2*entriesPerLabel, entries)
if req.Streams[0].Labels == ls1.String() {
assert.Equal(t, ls1.String(), req.Streams[0].Labels)
assert.Equal(t, ls2.String(), req.Streams[1].Labels)
} else {
assert.Equal(t, ls2.String(), req.Streams[0].Labels)
assert.Equal(t, ls1.String(), req.Streams[1].Labels)
}
}

Loading…
Cancel
Save