Loki: Add a configurable ability to fudge incoming timestamps for guaranteed query sort order when receiving entries for the same stream that have duplicate timestamps. (#6042)

* Add a per tenant ability to fudge duplicate timestamps in the distributor

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* update docs

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* lint

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* update changelog

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/6047/head
Ed Welch 4 years ago committed by GitHub
parent 88c4781dc7
commit fc8c4f0592
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 10
      docs/sources/configuration/_index.md
  3. 14
      pkg/distributor/distributor.go
  4. 208
      pkg/distributor/distributor_test.go
  5. 2
      pkg/distributor/limits.go
  6. 21
      pkg/distributor/validator.go
  7. 30
      pkg/validation/limits.go

@ -68,6 +68,7 @@
##### Fixes
* [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values
##### Changes
* [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time.
* [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable
* [5650](https://github.com/grafana/loki/pull/5650) **cyriltovena**: Remove more chunkstore and schema version below v9
* [5643](https://github.com/grafana/loki/pull/5643) **simonswine**: Introduce a ChunkRef type as part of logproto

@ -2100,6 +2100,16 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -distributor.max-line-size-truncate
[max_line_size_truncate: <boolean> | default = false ]
# Fudge the log line timestamp during ingestion when it's the same as the previous entry for the same stream
# When enabled, if a log line in a push request has the same timestamp as the previous line
# for the same stream, one nanosecond is added to the log line. This will preserve the received
# order of log lines with the exact same timestamp when they are queried by slightly altering
# their stored timestamp. NOTE: this is imperfect because Loki accepts out of order writes
# and another push request for the same stream could contain duplicate timestamps to existing
# entries and they will not be fudged.
# CLI flag: -validation.fudge-duplicate-timestamps
[fudge_duplicate_timestamp: <boolean> | default = false ]
# Maximum number of log entries that will be returned for a query.
# CLI flag: -validation.max-entries-limit
[max_entries_limit_per_query: <int> | default = 5000 ]

@ -271,7 +271,21 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validationErr = err
continue
}
stream.Entries[n] = entry
// If configured for this tenant, fudge duplicate timestamps. Note, this is imperfect
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps
if validationContext.fudgeDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only fudge the timestamp if the log content is different
if stream.Entries[n-1].Line != entry.Line {
stream.Entries[n].Timestamp = entry.Timestamp.Add(1 * time.Nanosecond)
}
}
n++
validatedSamplesSize += len(entry.Line)
validatedSamplesCount++

@ -100,6 +100,214 @@ func TestDistributor(t *testing.T) {
}
}
func Test_FudgeTimestamp(t *testing.T) {
fudgingDisabled := &validation.Limits{}
flagext.DefaultValues(fudgingDisabled)
fudgingDisabled.RejectOldSamples = false
fudgingEnabled := &validation.Limits{}
flagext.DefaultValues(fudgingEnabled)
fudgingEnabled.RejectOldSamples = false
fudgingEnabled.FudgeDuplicateTimestamp = true
tests := map[string]struct {
limits *validation.Limits
push *logproto.PushRequest
expectedPush *logproto.PushRequest
}{
"fudging disabled, no dupes": {
limits: fudgingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
},
},
},
},
},
"fudging disabled, with dupe timestamp different entry": {
limits: fudgingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"},
},
},
},
},
},
"fudging disabled, with dupe timestamp same entry": {
limits: fudgingDisabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
},
},
},
},
},
"fudging enabled, no dupes": {
limits: fudgingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
},
},
},
},
},
"fudging enabled, with dupe timestamp different entry": {
limits: fudgingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 1), Line: "heyiiiiiii"},
},
},
},
},
},
"fudging enabled, with dupe timestamp same entry": {
limits: fudgingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
},
},
},
},
},
"fudging enabled, multiple subsequent fudges": {
limits: fudgingEnabled,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "hi"},
{Timestamp: time.Unix(123456, 1), Line: "hey there"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 1), Line: "hi"},
{Timestamp: time.Unix(123456, 2), Line: "hey there"},
},
},
},
},
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
ingester := &mockIngester{}
d := prepare(t, testData.limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
_, err := d.Push(ctx, testData.push)
assert.NoError(t, err)
assert.Equal(t, testData.expectedPush, ingester.pushed[0])
})
}
}
func Test_SortLabelsOnPush(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

@ -14,4 +14,6 @@ type Limits interface {
CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
FudgeDuplicateTimestamps(userID string) bool
}

@ -40,20 +40,23 @@ type validationContext struct {
maxLabelNameLength int
maxLabelValueLength int
fudgeDuplicateTimestamps bool
userID string
}
func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
fudgeDuplicateTimestamps: v.FudgeDuplicateTimestamps(userID),
}
}

@ -46,18 +46,19 @@ const (
// to support user-friendly duration format (e.g: "1h30m45s") in JSON value.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
FudgeDuplicateTimestamp bool `yaml:"fudge_duplicate_timestamp" json:"fudge_duplicate_timestamp"`
// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
@ -135,6 +136,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Reject old samples.")
f.BoolVar(&l.FudgeDuplicateTimestamp, "validation.fudge-duplicate-timestamps", false, "Fudge the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp, guarantees sort order at query time.")
_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
@ -537,6 +539,10 @@ func (o *Overrides) PerStreamRateLimit(userID string) RateLimit {
}
}
func (o *Overrides) FudgeDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).FudgeDuplicateTimestamp
}
func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)

Loading…
Cancel
Save