Loki: fix validation error and metrics (#3307)

* Remove the helper functions for creating errors as they cause unintended value manipulation if the value contains any valid golang printf substitute characters, e.g. %s

* Improve error message for invalid labels, increment metrics for lines dropped for invalid labels.
pull/3329/head
Ed Welch 5 years ago committed by GitHub
parent 1eac0bfc28
commit 407afce0e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      pkg/distributor/distributor.go
  2. 18
      pkg/distributor/distributor_test.go
  3. 14
      pkg/distributor/validator.go
  4. 30
      pkg/distributor/validator_test.go
  5. 2
      pkg/ingester/instance.go
  6. 72
      pkg/util/validation/validate.go

@ -197,6 +197,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes))
continue
}
n := 0
@ -230,7 +236,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
@ -345,7 +351,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
}
ls, err := logql.ParseLabels(key)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
return "", httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {

@ -59,19 +59,19 @@ func TestDistributor(t *testing.T) {
},
{
lines: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(100, 100, 1000)),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 100, 100, 1000),
},
{
lines: 100,
maxLineSize: 1,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(1, 10, "{foo=\"bar\"}")),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 1, "{foo=\"bar\"}", 10),
},
{
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
@ -180,9 +180,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 10 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 5, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 6))},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(10, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 10, 1, 1)},
},
},
"global strategy: limit should be evenly shared across distributors": {
@ -192,9 +192,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 5 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 3, expectedError: nil},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 3))},
{bytes: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 3)},
{bytes: 2, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)},
},
},
"global strategy: burst should set to each distributor": {
@ -204,9 +204,9 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSizeMB: 20 * (1.0 / float64(bytesInMB)),
pushes: []testPush{
{bytes: 15, expectedError: nil},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 6))},
{bytes: 6, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 6)},
{bytes: 5, expectedError: nil},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(5, 1, 1))},
{bytes: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, 5, 1, 1)},
},
},
}

@ -57,13 +57,13 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, entry.Timestamp)
}
if ts > ctx.creationGracePeriod {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, entry.Timestamp)
}
if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {
@ -73,7 +73,7 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}
return nil
@ -89,20 +89,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
}
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
}

@ -43,14 +43,14 @@ func TestValidator_ValidateEntry(t *testing.T) {
}
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(testStreamLabels, testTime.Add(-time.Hour*5))),
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)),
},
{
"test too new",
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(testStreamLabels, testTime.Add(time.Hour*5))),
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5)),
},
{
"line too long",
@ -61,7 +61,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
}
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(10, 11, testStreamLabels)),
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
},
}
for _, tt := range tests {
@ -101,7 +101,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg("{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2)),
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2),
},
{
"label name too long",
@ -113,7 +113,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{fooooo=\"bar\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg("{fooooo=\"bar\"}", "fooooo")),
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"),
},
{
"label value too long",
@ -126,7 +126,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{foo=\"barrrrrr\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg("{foo=\"barrrrrr\"}", "barrrrrr")),
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"),
},
{
"duplicate label",
@ -139,7 +139,23 @@ func TestValidator_ValidateLabels(t *testing.T) {
}
},
"{foo=\"bar\", foo=\"barf\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg("{foo=\"bar\", foo=\"barf\"}", "foo")),
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"),
},
{
"label value contains %",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
"{foo=\"bar\", foo=\"barf%s\"}",
httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(http.StatusBadRequest),
Body: []byte("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING)
}),
},
}
for _, tt := range tests {

@ -216,7 +216,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg)
}
labels, err := logql.ParseLabels(pushReqStream.Labels)

@ -1,44 +1,43 @@
package validation
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
discardReasonLabel = "reason"
// InvalidLabels is a reason for discarding log lines which have labels that cannot be parsed.
InvalidLabels = "invalid_labels"
InvalidLabelsErrorMsg = "Error parsing labels '%s' with error: %s"
// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"
rateLimitErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased"
RateLimited = "rate_limited"
RateLimitedErrorMsg = "Ingestion rate limit exceeded (limit: %d bytes/sec) while attempting to ingest '%d' lines totaling '%d' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased"
// LineTooLong is a reason for discarding too long log lines.
LineTooLong = "line_too_long"
lineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes"
LineTooLongErrorMsg = "Max entry size '%d' bytes exceeded for stream '%s' while adding an entry with length '%d' bytes"
// StreamLimit is a reason for discarding lines when we can't create a new stream
// because the limit of active streams has been reached.
StreamLimit = "stream_limit"
streamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased"
StreamLimitErrorMsg = "Maximum active stream limit exceeded, reduce the number of active streams (reduce labels or reduce label values), or contact your Loki administrator to see if the limit can be increased"
// GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age`
GreaterThanMaxSampleAge = "greater_than_max_sample_age"
greaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"
GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"
// TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period`
TooFarInFuture = "too_far_in_future"
tooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"
TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"
// MaxLabelNamesPerSeries is a reason for discarding a log line which has too many label names
MaxLabelNamesPerSeries = "max_label_names_per_series"
maxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d"
MaxLabelNamesPerSeriesErrorMsg = "entry for stream '%s' has %d label names; limit %d"
// LabelNameTooLong is a reason for discarding a log line which has a label name too long
LabelNameTooLong = "label_name_too_long"
labelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'"
LabelNameTooLongErrorMsg = "stream '%s' has label name too long: '%s'"
// LabelValueTooLong is a reason for discarding a log line which has a lable value too long
LabelValueTooLong = "label_value_too_long"
labelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'"
LabelValueTooLongErrorMsg = "stream '%s' has label value too long: '%s'"
// DuplicateLabelNames is a reason for discarding a log line which has duplicate label names
DuplicateLabelNames = "duplicate_label_names"
duplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
)
// DiscardedBytes is a metric of the total discarded bytes, by reason.
@ -64,48 +63,3 @@ var DiscardedSamples = prometheus.NewCounterVec(
func init() {
prometheus.MustRegister(DiscardedSamples, DiscardedBytes)
}
// RateLimitedErrorMsg returns an error string for rate limited requests
func RateLimitedErrorMsg(limit, lines, bytes int) string {
return fmt.Sprintf(rateLimitErrorMsg, limit, lines, bytes)
}
// LineTooLongErrorMsg returns an error string for a line which is too long
func LineTooLongErrorMsg(maxLength, entryLength int, stream string) string {
return fmt.Sprintf(lineTooLongErrorMsg, maxLength, stream, entryLength)
}
// StreamLimitErrorMsg returns an error string for requests refused for exceeding active stream limits
func StreamLimitErrorMsg() string {
return fmt.Sprint(streamLimitErrorMsg)
}
// GreaterThanMaxSampleAgeErrorMsg returns an error string for a line with a timestamp too old
func GreaterThanMaxSampleAgeErrorMsg(stream string, timestamp time.Time) string {
return fmt.Sprintf(greaterThanMaxSampleAgeErrorMsg, stream, timestamp)
}
// TooFarInFutureErrorMsg returns an error string for a line with a timestamp too far in the future
func TooFarInFutureErrorMsg(stream string, timestamp time.Time) string {
return fmt.Sprintf(tooFarInFutureErrorMsg, stream, timestamp)
}
// MaxLabelNamesPerSeriesErrorMsg returns an error string for a stream with too many labels
func MaxLabelNamesPerSeriesErrorMsg(stream string, labelCount, labelLimit int) string {
return fmt.Sprintf(maxLabelNamesPerSeriesErrorMsg, stream, labelCount, labelLimit)
}
// LabelNameTooLongErrorMsg returns an error string for a stream with a label name too long
func LabelNameTooLongErrorMsg(stream, label string) string {
return fmt.Sprintf(labelNameTooLongErrorMsg, stream, label)
}
// LabelValueTooLongErrorMsg returns an error string for a stream with a label value too long
func LabelValueTooLongErrorMsg(stream, labelValue string) string {
return fmt.Sprintf(labelValueTooLongErrorMsg, stream, labelValue)
}
// DuplicateLabelNamesErrorMsg returns an error string for a stream which has duplicate labels
func DuplicateLabelNamesErrorMsg(stream, label string) string {
return fmt.Sprintf(duplicateLabelNamesErrorMsg, stream, label)
}

Loading…
Cancel
Save