pkg/ingester: handle labels mapping to the same fast fingerprint. (#1247)

* pkg/ingester: handle labels mapping to the same fast fingerprint.

Uses slightly adapted fpMapper code from Cortex.

Fixes issue #898

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* empty commit to force new build

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed empty lines in imports.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added test that pushes entries concurrently.

To be run with -race.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Stream now keeps both original and mapped fingerprint

Mapped fingerprint is used in streams map.
Original ("rawFP") is preserved when saving chunks.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Vendoring

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Reverted previous commit (keep the test, with updated checks).

Preserving raw fingerprints could lead to data loss when
doing chunk deduplication while storing chunks. We don't want
that.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* go mod vendor

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
pull/1376/head
Peter Štibraný 6 years ago committed by Cyril Tovena
parent 2b7f375a21
commit e0ec61f986
  1. 9
      pkg/ingester/flush.go
  2. 108
      pkg/ingester/flush_test.go
  3. 34
      pkg/ingester/instance.go
  4. 111
      pkg/ingester/instance_test.go
  5. 187
      pkg/ingester/mapper.go
  6. 137
      pkg/ingester/mapper_test.go
  7. 17
      pkg/ingester/stream.go

@ -15,7 +15,6 @@ import (
"github.com/weaveworks/common/user" "github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/chunkenc"
loki_util "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util"
@ -215,7 +214,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
return nil return nil
} }
func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelAdapter) { func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) {
instance.streamsMtx.Lock() instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock() defer instance.streamsMtx.Unlock()
@ -270,19 +269,19 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
if len(stream.chunks) == 0 { if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp) delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp) instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc() instance.streamsRemovedTotal.Inc()
memoryStreams.Dec() memoryStreams.Dec()
} }
} }
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelAdapter, cs []*chunkDesc) error { func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
userID, err := user.ExtractOrgID(ctx) userID, err := user.ExtractOrgID(ctx)
if err != nil { if err != nil {
return err return err
} }
labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(labelPairs)) labelsBuilder := labels.NewBuilder(labelPairs)
labelsBuilder.Set(nameLabel, logsValue) labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels() metric := labelsBuilder.Labels()

@ -42,18 +42,64 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.RetainPeriod = 500 * time.Millisecond cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg) store, ing := newTestStore(t, cfg)
userIDs, testData := pushTestSamples(t, ing) testData := pushTestSamples(t, ing)
// wait beyond idle time so samples flush // wait beyond idle time so samples flush
time.Sleep(cfg.MaxChunkIdle * 2) time.Sleep(cfg.MaxChunkIdle * 2)
store.checkData(t, userIDs, testData) store.checkData(t, testData)
} }
func TestChunkFlushingShutdown(t *testing.T) { func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t)) store, ing := newTestStore(t, defaultIngesterTestConfig(t))
userIDs, testData := pushTestSamples(t, ing) testData := pushTestSamples(t, ing)
ing.Shutdown() ing.Shutdown()
store.checkData(t, userIDs, testData) store.checkData(t, testData)
}
func TestFlushingCollidingLabels(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = 20 * time.Millisecond
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg)
defer store.Stop()
const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)
// checkData only iterates between unix seconds 0 and 1000
now := time.Unix(0, 0)
req := &logproto.PushRequest{Streams: []*logproto.Stream{
// some colliding label sets
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)},
{Labels: model.LabelSet{"app": "l", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now)},
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now)},
}}
sort.Slice(req.Streams, func(i, j int) bool {
return req.Streams[i].Labels < req.Streams[j].Labels
})
_, err := ing.Push(ctx, req)
require.NoError(t, err)
// force flush
ing.Shutdown()
// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})
// make sure all chunks have different fingerprint, even colliding ones.
chunkFingerprints := map[model.Fingerprint]bool{}
for _, c := range store.getChunksForUser(userID) {
require.False(t, chunkFingerprints[c.Fingerprint])
chunkFingerprints[c.Fingerprint] = true
}
} }
type testStore struct { type testStore struct {
@ -103,12 +149,19 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
if err != nil { if err != nil {
return err return err
} }
for _, chunk := range chunks { for ix, chunk := range chunks {
for _, label := range chunk.Metric { for _, label := range chunk.Metric {
if label.Value == "" { if label.Value == "" {
return fmt.Errorf("Chunk has blank label %q", label.Name) return fmt.Errorf("Chunk has blank label %q", label.Name)
} }
} }
// remove __name__ label
if chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chunk.Metric)
labelsBuilder.Del("__name__")
chunks[ix].Metric = labelsBuilder.Labels()
}
} }
s.chunks[userID] = append(s.chunks[userID], chunks...) s.chunks[userID] = append(s.chunks[userID], chunks...)
return nil return nil
@ -124,7 +177,7 @@ func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (
func (s *testStore) Stop() {} func (s *testStore) Stop() {}
func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[string][]*logproto.Stream) { func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logproto.Stream {
userIDs := []string{"1", "2", "3"} userIDs := []string{"1", "2", "3"}
// Create test samples. // Create test samples.
@ -141,7 +194,7 @@ func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[str
}) })
require.NoError(t, err) require.NoError(t, err)
} }
return userIDs, testData return testData
} }
func buildTestStreams(offset int) []*logproto.Stream { func buildTestStreams(offset int) []*logproto.Stream {
@ -170,27 +223,30 @@ func buildTestStreams(offset int) []*logproto.Stream {
} }
// check that the store is holding data equivalent to what we expect // check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) { func (s *testStore) checkData(t *testing.T, testData map[string][]*logproto.Stream) {
for userID, expected := range testData {
streams := s.getStreamsForUser(t, userID)
require.Equal(t, expected, streams)
}
}
func (s *testStore) getStreamsForUser(t *testing.T, userID string) []*logproto.Stream {
var streams []*logproto.Stream
for _, c := range s.getChunksForUser(userID) {
lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk()
streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk))
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
return streams
}
func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
for _, userID := range userIDs {
chunks := s.chunks[userID] return s.chunks[userID]
streams := []*logproto.Stream{}
for _, chunk := range chunks {
lokiChunk := chunk.Data.(*chunkenc.Facade).LokiChunk()
if chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chunk.Metric)
labelsBuilder.Del("__name__")
chunk.Metric = labelsBuilder.Labels()
}
labels := chunk.Metric.String()
streams = append(streams, buildStreamsFromChunk(t, labels, lokiChunk))
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
require.Equal(t, testData[userID], streams)
}
} }
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {

@ -51,8 +51,9 @@ var (
type instance struct { type instance struct {
streamsMtx sync.RWMutex streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back
instanceID string instanceID string
@ -67,7 +68,7 @@ type instance struct {
} }
func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance { func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
return &instance{ i := &instance{
streams: map[model.Fingerprint]*stream{}, streams: map[model.Fingerprint]*stream{},
index: index.New(), index: index.New(),
instanceID: instanceID, instanceID: instanceID,
@ -79,6 +80,8 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides)
tailers: map[uint32]*tailer{}, tailers: map[uint32]*tailer{},
limits: limits, limits: limits,
} }
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
} }
// consumeChunk manually adds a chunk that was received during ingester chunk // consumeChunk manually adds a chunk that was received during ingester chunk
@ -87,11 +90,13 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.streamsMtx.Lock() i.streamsMtx.Lock()
defer i.streamsMtx.Unlock() defer i.streamsMtx.Unlock()
fp := client.FastFingerprint(labels) rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
stream, ok := i.streams[fp] stream, ok := i.streams[fp]
if !ok { if !ok {
stream = newStream(fp, labels, i.blockSize) sortedLabels := i.index.Add(labels, fp)
i.index.Add(labels, fp) stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream i.streams[fp] = stream
i.streamsCreatedTotal.Inc() i.streamsCreatedTotal.Inc()
memoryStreams.Inc() memoryStreams.Inc()
@ -137,7 +142,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
} }
func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) { func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
fp := client.FastFingerprint(labels) rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
stream, ok := i.streams[fp] stream, ok := i.streams[fp]
if ok { if ok {
@ -147,8 +153,8 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) { if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID)) return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
} }
stream = newStream(fp, labels, i.blockSize) sortedLabels := i.index.Add(labels, fp)
i.index.Add(labels, fp) stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream i.streams[fp] = stream
memoryStreams.Inc() memoryStreams.Inc()
i.streamsCreatedTotal.Inc() i.streamsCreatedTotal.Inc()
@ -157,6 +163,15 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil return stream, nil
} }
// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
s := i.streams[fp]
if s == nil {
return nil
}
return s.labels
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector() expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil { if err != nil {
@ -211,9 +226,8 @@ outer:
if !ok { if !ok {
return nil, ErrStreamMissing return nil, ErrStreamMissing
} }
lbs := client.FromLabelAdaptersToLabels(stream.labels)
for _, filter := range filters { for _, filter := range filters {
if !filter.Matches(lbs.Get(filter.Name)) { if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer continue outer
} }
} }

@ -0,0 +1,111 @@
package ingester
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/util/validation"
"github.com/stretchr/testify/require"
)
func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)
i := newInstance("test", 512, o)
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
// Notice how labels aren't sorted.
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
// both label sets have FastFingerprint=e002a3a451262627
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"1\"}", Entries: entries(5, tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",app=\"m\",uniq1=\"1\"}", Entries: entries(5, tt)},
// e002a3a451262247
{Labels: "{app=\"l\",uniq0=\"1\",uniq1=\"0\"}", Entries: entries(5, tt.Add(time.Minute))},
{Labels: "{uniq1=\"0\",app=\"m\",uniq0=\"0\"}", Entries: entries(5, tt)},
// e002a2a4512624f4
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"0\"}", Entries: entries(5, tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",uniq1=\"0\",app=\"m\"}", Entries: entries(5, tt)},
}})
require.NoError(t, err)
}
func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)
inst := newInstance("test", 512, o)
const (
concurrent = 10
iterations = 100
entriesPerIteration = 100
)
uniqueLabels := map[string]bool{}
startChannel := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < concurrent; i++ {
l := makeRandomLabels()
for uniqueLabels[l] {
l = makeRandomLabels()
}
uniqueLabels[l] = true
wg.Add(1)
go func(labels string) {
defer wg.Done()
<-startChannel
tt := time.Now().Add(-5 * time.Minute)
for i := 0; i < iterations; i++ {
err := inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
{Labels: labels, Entries: entries(entriesPerIteration, tt)},
}})
require.NoError(t, err)
tt = tt.Add(entriesPerIteration * time.Nanosecond)
}
}(l)
}
time.Sleep(100 * time.Millisecond) // ready
close(startChannel) // go!
wg.Wait()
// test passes if no goroutine reports error
}
func entries(n int, t time.Time) []logproto.Entry {
var result []logproto.Entry
for i := 0; i < n; i++ {
result = append(result, logproto.Entry{Timestamp: t, Line: fmt.Sprintf("hello %d", i)})
t = t.Add(time.Nanosecond)
}
return result
}
var labelNames = []string{"app", "instance", "namespace", "user", "cluster"}
func makeRandomLabels() string {
ls := labels.NewBuilder(nil)
for _, ln := range labelNames {
ls.Set(ln, fmt.Sprintf("%d", rand.Int31()))
}
return ls.Labels().String()
}

@ -0,0 +1,187 @@
package ingester
import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
)
const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.
var separatorString = string([]byte{model.SeparatorByte})
// fpMapper is used to map fingerprints in order to work around fingerprint
// collisions.
type fpMapper struct {
// highestMappedFP has to be aligned for atomic operations.
highestMappedFP model.Fingerprint
mtx sync.RWMutex // Protects mappings.
// maps original fingerprints to a map of string representations of
// metrics to the truly unique fingerprint.
mappings map[model.Fingerprint]map[string]model.Fingerprint
// Returns existing labels for given fingerprint, if any.
// Equality check relies on labels.Labels being sorted.
fpToLabels func(fingerprint model.Fingerprint) labels.Labels
}
// newFPMapper returns an fpMapper ready to use.
func newFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *fpMapper {
if fpToLabels == nil {
panic("nil fpToLabels")
}
return &fpMapper{
fpToLabels: fpToLabels,
mappings: map[model.Fingerprint]map[string]model.Fingerprint{},
}
}
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
return m.maybeAddMapping(fp, metric)
}
// Then check the most likely case: This fp belongs to a series that is
// already in memory.
s := m.fpToLabels(fp)
if s != nil {
// FP exists in memory, but is it for the same metric?
if equalLabels(metric, s) {
// Yupp. We are done.
return fp
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
}
// Metric is not in memory. Before doing the expensive archive lookup,
// check if we have a mapping for this metric in place already.
m.mtx.RLock()
mappedFPs, fpAlreadyMapped := m.mappings[fp]
m.mtx.RUnlock()
if fpAlreadyMapped {
// We indeed have mapped fp historically.
ms := metricToUniqueString(metric)
// fp is locked by the caller, so no further locking of
// 'collisions' required (it is specific to fp).
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP
}
}
return fp
}
func valueForName(s labels.Labels, name string) (string, bool) {
pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name })
if pos == len(s) || s[pos].Name != name {
return "", false
}
return s[pos].Value, true
}
// Check if a and b contain the same name/value pairs
func equalLabels(a []client.LabelAdapter, b labels.Labels) bool {
if len(a) != len(b) {
return false
}
// Check as many as we can where the two sets are in the same order
i := 0
for ; i < len(a); i++ {
if b[i].Name != a[i].Name {
break
}
if b[i].Value != a[i].Value {
return false
}
}
// Now check remaining values using binary search
for ; i < len(a); i++ {
v, found := valueForName(b, a[i].Name)
if !found || v != a[i].Value {
return false
}
}
return true
}
// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []client.LabelAdapter) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
m.mtx.RUnlock()
if ok {
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP // Existing mapping.
}
// A new mapping has to be created.
mappedFP = m.nextMappedFP()
mappedFPs[ms] = mappedFP
level.Info(util.Logger).Log(
"msg", "fingerprint collision detected, mapping to new fingerprint",
"old_fp", fp,
"new_fp", mappedFP,
"metric", ms,
)
return mappedFP
}
// This is the first collision for fp.
mappedFP := m.nextMappedFP()
mappedFPs = map[string]model.Fingerprint{ms: mappedFP}
m.mtx.Lock()
m.mappings[fp] = mappedFPs
m.mtx.Unlock()
level.Info(util.Logger).Log(
"msg", "fingerprint collision detected, mapping to new fingerprint",
"old_fp", fp,
"new_fp", mappedFP,
"metric", collidingMetric,
)
return mappedFP
}
func (m *fpMapper) nextMappedFP() model.Fingerprint {
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
return mappedFP
}
// metricToUniqueString turns a metric into a string in a reproducible and
// unique way, i.e. the same metric will always create the same string, and
// different metrics will always create different strings. In a way, it is the
// "ideal" fingerprint function, only that it is more expensive than the
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m []client.LabelAdapter) string {
parts := make([]string, 0, len(m))
for _, pair := range m {
parts = append(parts, pair.Name+separatorString+pair.Value)
}
sort.Strings(parts)
return strings.Join(parts, separatorString)
}

@ -0,0 +1,137 @@
package ingester
import (
"sort"
"testing"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
var (
// cm11, cm12, cm13 are colliding with fp1.
// cm21, cm22 are colliding with fp2.
// cm31, cm32 are colliding with fp3, which is below maxMappedFP.
// Note that fingerprints are set and not actually calculated.
// The collision detection is independent from the actually used
// fingerprinting algorithm.
fp1 = model.Fingerprint(maxMappedFP + 1)
fp2 = model.Fingerprint(maxMappedFP + 2)
fp3 = model.Fingerprint(1)
cm11 = []client.LabelAdapter{
{Name: "foo", Value: "bar"},
{Name: "dings", Value: "bumms"},
}
cm12 = []client.LabelAdapter{
{Name: "bar", Value: "foo"},
}
cm13 = []client.LabelAdapter{
{Name: "foo", Value: "bar"},
}
cm21 = []client.LabelAdapter{
{Name: "foo", Value: "bumms"},
{Name: "dings", Value: "bar"},
}
cm22 = []client.LabelAdapter{
{Name: "dings", Value: "foo"},
{Name: "bar", Value: "bumms"},
}
cm31 = []client.LabelAdapter{
{Name: "bumms", Value: "dings"},
}
cm32 = []client.LabelAdapter{
{Name: "bumms", Value: "dings"},
{Name: "bar", Value: "foo"},
}
)
func copyValuesAndSort(a []client.LabelAdapter) labels.Labels {
c := make(labels.Labels, len(a))
for i, pair := range a {
c[i].Name = pair.Name
c[i].Value = pair.Value
}
sort.Sort(c)
return c
}
func TestFPMapper(t *testing.T) {
sm := map[model.Fingerprint]labels.Labels{}
mapper := newFPMapper(func(fp model.Fingerprint) labels.Labels {
return sm[fp]
})
// Everything is empty, resolving a FP should do nothing.
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), fp1)
// cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve
// the collision.
sm[fp1] = copyValuesAndSort(cm11)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1))
// The mapped cm12 is added to sm, too. That should not change the outcome.
sm[model.Fingerprint(1)] = copyValuesAndSort(cm12)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1))
// Now map cm13, should reproducibly result in the next mapped FP.
assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2))
assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2))
// Add cm13 to sm. Should not change anything.
sm[model.Fingerprint(2)] = copyValuesAndSort(cm13)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1))
assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2))
// Now add cm21 and cm22 in the same way, checking the mapped FPs.
assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2)
sm[fp2] = copyValuesAndSort(cm21)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3))
sm[model.Fingerprint(3)] = copyValuesAndSort(cm22)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3))
// Map cm31, resulting in a mapping straight away.
assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4))
sm[model.Fingerprint(4)] = copyValuesAndSort(cm31)
// Map cm32, which is now mapped for two reasons...
assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5))
sm[model.Fingerprint(5)] = copyValuesAndSort(cm32)
// Now check ALL the mappings, just to be sure.
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1))
assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2))
assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3))
assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4))
assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5))
// Remove all the fingerprints from sm, which should change nothing, as
// the existing mappings stay and should be detected.
delete(sm, fp1)
delete(sm, fp2)
delete(sm, fp3)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1)
assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1))
assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2))
assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2)
assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3))
assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4))
assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5))
}
// assertFingerprintEqual asserts that two fingerprints are equal.
func assertFingerprintEqual(t *testing.T, gotFP, wantFP model.Fingerprint) {
if gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
}

@ -8,9 +8,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/chunkenc"
@ -44,8 +45,8 @@ type stream struct {
// Newest chunk at chunks[n-1]. // Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller. // Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc chunks []chunkDesc
fp model.Fingerprint fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels []client.LabelAdapter labels labels.Labels
blockSize int blockSize int
tailers map[uint32]*tailer tailers map[uint32]*tailer
@ -65,7 +66,7 @@ type entryWithError struct {
e error e error
} }
func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int) *stream { func newStream(fp model.Fingerprint, labels labels.Labels, blockSize int) *stream {
return &stream{ return &stream{
fp: fp, fp: fp,
labels: labels, labels: labels,
@ -126,7 +127,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if len(storedEntries) != 0 { if len(storedEntries) != 0 {
go func() { go func() {
stream := logproto.Stream{Labels: client.FromLabelAdaptersToLabels(s.labels).String(), Entries: storedEntries} stream := logproto.Stream{Labels: s.labels.String(), Entries: storedEntries}
closedTailers := []uint32{} closedTailers := []uint32{}
@ -156,7 +157,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder { if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries // return bad http status request response with all failed entries
buf := bytes.Buffer{} buf := bytes.Buffer{}
streamName := client.FromLabelAdaptersToLabels(s.labels).String() streamName := s.labels.String()
for _, entryWithError := range failedEntriesWithError { for _, entryWithError := range failedEntriesWithError {
_, _ = fmt.Fprintf(&buf, _, _ = fmt.Fprintf(&buf,
@ -193,7 +194,7 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction,
} }
} }
return iter.NewNonOverlappingIterator(iterators, client.FromLabelAdaptersToLabels(s.labels).String()), nil return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil
} }
func (s *stream) addTailer(t *tailer) { func (s *stream) addTailer(t *tailer) {
@ -204,6 +205,6 @@ func (s *stream) addTailer(t *tailer) {
} }
func (s *stream) matchesTailer(t *tailer) bool { func (s *stream) matchesTailer(t *tailer) bool {
metric := client.FromLabelAdaptersToMetric(s.labels) metric := util.LabelsToMetric(s.labels)
return t.isWatchingLabels(metric) return t.isWatchingLabels(metric)
} }

Loading…
Cancel
Save