Cleanup labels parsing. (#2929)

* Cleanup labels parsing.

Make sure we only use one type of labels parsing.This just reduce the amount of code we have to maintain.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2933/head
Cyril Tovena 6 years ago committed by GitHub
parent 18e29236f4
commit e1ab2cc509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/distributor/distributor.go
  2. 6
      pkg/distributor/validator.go
  3. 12
      pkg/distributor/validator_test.go
  4. 20
      pkg/ingester/instance.go
  5. 43
      pkg/ingester/mapper.go
  6. 27
      pkg/ingester/mapper_test.go
  7. 6
      pkg/ingester/transfer.go
  8. 13
      pkg/util/conv.go

@ -7,7 +7,6 @@ import (
"time"
cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
cortex_util "github.com/cortexproject/cortex/pkg/util"
@ -25,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -206,14 +206,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0
for _, stream := range req.Streams {
ls, err := util.ToClientLabels(stream.Labels)
ls, err := logql.ParseLabels(stream.Labels)
if err != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
continue
}
// ensure labels are correctly sorted.
// todo(ctovena) we should lru cache this
stream.Labels = cortex_client.FromLabelAdaptersToLabels(ls).String()
stream.Labels = ls.String()
if err := d.validator.ValidateLabels(userID, ls, stream); err != nil {
validationErr = err
continue

@ -6,7 +6,7 @@ import (
"strings"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/logproto"
@ -52,7 +52,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}
// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter, stream logproto.Stream) error {
func (v Validator) ValidateLabels(userID string, ls labels.Labels, stream logproto.Stream) error {
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
@ -61,7 +61,7 @@ func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, v.MaxLabelNamesPerSeries(userID)))
}
maxLabelNameLength := v.MaxLabelNameLength(userID)

@ -5,13 +5,13 @@ import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/validation"
)
@ -101,7 +101,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{fed=\"bears\", foo=\"bar\", food=\"bars\"}", 3, 2)),
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2)),
},
{
"label name too long",
@ -157,10 +157,10 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
}
func mustParseLabels(s string) []client.LabelAdapter {
labels, err := util.ToClientLabels(s)
func mustParseLabels(s string) labels.Labels {
ls, err := logql.ParseLabels(s)
if err != nil {
panic(err)
}
return labels
return ls
}

@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -108,15 +107,15 @@ func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk,
// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
fp := i.getHashForLabels(labels)
fp := i.getHashForLabels(ls)
stream, ok := i.streamsByFP[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
@ -175,13 +174,13 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}
labels, err := util.ToClientLabels(pushReqStream.Labels)
labels, err := logql.ParseLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
_ = i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, fp, labels, i.factory)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream
@ -192,11 +191,10 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return stream, nil
}
func (i *instance) getHashForLabels(labels []client.LabelAdapter) model.Fingerprint {
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
lbsModel := client.FromLabelAdaptersToLabels(labels)
fp, i.buf = lbsModel.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), labels)
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), ls)
}
// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.

@ -8,8 +8,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
@ -51,7 +49,7 @@ func newFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) model.Fingerprint {
func (m *fpMapper) mapFP(fp model.Fingerprint, metric labels.Labels) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
@ -63,7 +61,7 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) mod
s := m.fpToLabels(fp)
if s != nil {
// FP exists in memory, but is it for the same metric?
if equalLabels(metric, s) {
if labels.Equal(metric, s) {
// Yupp. We are done.
return fp
}
@ -89,43 +87,10 @@ func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) mod
return fp
}
func valueForName(s labels.Labels, name string) (string, bool) {
pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name })
if pos == len(s) || s[pos].Name != name {
return "", false
}
return s[pos].Value, true
}
// Check if a and b contain the same name/value pairs
func equalLabels(a []client.LabelAdapter, b labels.Labels) bool {
if len(a) != len(b) {
return false
}
// Check as many as we can where the two sets are in the same order
i := 0
for ; i < len(a); i++ {
if b[i].Name != a[i].Name {
break
}
if b[i].Value != a[i].Value {
return false
}
}
// Now check remaining values using binary search
for ; i < len(a); i++ {
v, found := valueForName(b, a[i].Name)
if !found || v != a[i].Value {
return false
}
}
return true
}
// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []client.LabelAdapter) model.Fingerprint {
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric labels.Labels) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
@ -177,7 +142,7 @@ func (m *fpMapper) nextMappedFP() model.Fingerprint {
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m []client.LabelAdapter) string {
func metricToUniqueString(m labels.Labels) string {
parts := make([]string, 0, len(m))
for _, pair := range m {
parts = append(parts, pair.Name+separatorString+pair.Value)

@ -4,8 +4,6 @@ import (
"sort"
"testing"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -20,34 +18,34 @@ var (
fp1 = model.Fingerprint(maxMappedFP + 1)
fp2 = model.Fingerprint(maxMappedFP + 2)
fp3 = model.Fingerprint(1)
cm11 = []client.LabelAdapter{
{Name: "foo", Value: "bar"},
cm11 = []labels.Label{
{Name: "dings", Value: "bumms"},
{Name: "foo", Value: "bar"},
}
cm12 = []client.LabelAdapter{
cm12 = []labels.Label{
{Name: "bar", Value: "foo"},
}
cm13 = []client.LabelAdapter{
cm13 = []labels.Label{
{Name: "foo", Value: "bar"},
}
cm21 = []client.LabelAdapter{
{Name: "foo", Value: "bumms"},
cm21 = []labels.Label{
{Name: "dings", Value: "bar"},
{Name: "foo", Value: "bumms"},
}
cm22 = []client.LabelAdapter{
{Name: "dings", Value: "foo"},
cm22 = []labels.Label{
{Name: "bar", Value: "bumms"},
{Name: "dings", Value: "foo"},
}
cm31 = []client.LabelAdapter{
cm31 = []labels.Label{
{Name: "bumms", Value: "dings"},
}
cm32 = []client.LabelAdapter{
{Name: "bumms", Value: "dings"},
cm32 = []labels.Label{
{Name: "bar", Value: "foo"},
{Name: "bumms", Value: "dings"},
}
)
func copyValuesAndSort(a []client.LabelAdapter) labels.Labels {
func copyValuesAndSort(a []labels.Label) labels.Labels {
c := make(labels.Labels, len(a))
for i, pair := range a {
c[i].Name = pair.Name
@ -131,6 +129,7 @@ func TestFPMapper(t *testing.T) {
// assertFingerprintEqual asserts that two fingerprints are equal.
func assertFingerprintEqual(t *testing.T, gotFP, wantFP model.Fingerprint) {
t.Helper()
if gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}

@ -6,13 +6,13 @@ import (
"os"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
@ -101,9 +101,9 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
lbls := []client.LabelAdapter{}
lbls := make([]labels.Label, 0, len(chunkSet.Labels))
for _, lbl := range chunkSet.Labels {
lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value})
lbls = append(lbls, labels.Label{Name: lbl.Name, Value: lbl.Value})
}
instance := i.getOrCreateInstance(chunkSet.UserId)

@ -2,26 +2,13 @@ package util
import (
"math"
"sort"
"time"
"unsafe"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
)
// ToClientLabels parses the labels and converts them to the Cortex type.
func ToClientLabels(labels string) ([]client.LabelAdapter, error) {
ls, err := parser.ParseMetric(labels)
if err != nil {
return nil, err
}
sort.Sort(ls)
return client.FromLabelsToLabelAdapters(ls), nil
}
// ModelLabelSetToMap convert a model.LabelSet to a map[string]string
func ModelLabelSetToMap(m model.LabelSet) map[string]string {
if len(m) == 0 {

Loading…
Cancel
Save