From a2c8ec7dc11546e2876e159e76eabd727ebc1929 Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Tue, 14 Jan 2025 16:37:18 -0300 Subject: [PATCH] feat: Add a new `enforced_labels` limit (#15704) **What this PR does / why we need it**: Introduce the new experimental `enforced_labels` limit. By default it is empty but when configured, it configures Loki to only accept push requests which streams have all enforced labels. --- docs/sources/shared/configuration.md | 6 ++++ pkg/distributor/distributor.go | 31 ++++++++++++++++ pkg/distributor/distributor_test.go | 53 ++++++++++++++++++++++++++++ pkg/distributor/limits.go | 1 + pkg/distributor/validator.go | 2 ++ pkg/validation/limits.go | 6 ++++ pkg/validation/limits_test.go | 13 ++++--- pkg/validation/validate.go | 2 ++ 8 files changed, 110 insertions(+), 4 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d4a12077ed..d602c48e05 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3941,6 +3941,12 @@ otlp_config: # CLI flag: -limits.block-ingestion-status-code [block_ingestion_status_code: | default = 260] +# List of labels that must be present in the stream. If any of the labels are +# missing, the stream will be discarded. This flag configures it globally for +# all tenants. Experimental. +# CLI flag: -validation.enforced-labels +[enforced_labels: | default = []] + # The number of partitions a tenant's data should be sharded to when using kafka # ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 # disables shuffle sharding and tenant is sharded across all partitions. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4d53899e1b..6ede42aab1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -523,6 +523,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log continue } + if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing { + err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID) + d.writeFailuresManager.Log(tenantID, err) + validationErrors.Add(err) + validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries))) + discardedBytes := util.EntriesTotalSize(stream.Entries) + validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes)) + continue + } + n := 0 pushSize := 0 prevTs := stream.Entries[0].Timestamp @@ -733,6 +743,27 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } } +// missingEnforcedLabels returns true if the stream is missing any of the required labels. +// +// It also returns the first label that is missing if any (for the case of multiple labels missing). +func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (bool, []string) { + requiredLbs := d.validator.Limits.EnforcedLabels(tenantID) + if len(requiredLbs) == 0 { + // no enforced labels configured. + return false, []string{} + } + + missingLbs := []string{} + + for _, lb := range requiredLbs { + if !lbs.Has(lb) { + missingLbs = append(missingLbs, lb) + } + } + + return len(missingLbs) > 0, missingLbs +} + func (d *Distributor) trackDiscardedData( ctx context.Context, req *logproto.PushRequest, diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ae14f6110d..91d3fcdf13 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -426,6 +426,59 @@ func Test_IncrementTimestamp(t *testing.T) { } } +func Test_MissingEnforcedLabels(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.EnforcedLabels = []string{"app", "env"} + + distributors, _ := prepare(t, 1, 5, limits, nil) + + // request with all required labels. + lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod"}) + missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test") + assert.False(t, missing) + assert.Empty(t, missingLabels) + + // request missing the `app` label. + lbs = labels.FromMap(map[string]string{"env": "prod"}) + missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test") + assert.True(t, missing) + assert.EqualValues(t, []string{"app"}, missingLabels) + + // request missing all required labels. + lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"}) + missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test") + assert.True(t, missing) + assert.EqualValues(t, []string{"app", "env"}, missingLabels) +} + +func Test_PushWithEnforcedLabels(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + // makeWriteRequest only contains a `{foo="bar"}` label. + req := makeWriteRequest(100, 100) + limits.EnforcedLabels = []string{"app", "env"} + distributors, _ := prepare(t, 1, 3, limits, nil) + // enforced labels configured, but all labels are missing. + _, err := distributors[0].Push(ctx, req) + require.Error(t, err) + expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test") + require.EqualError(t, err, expectedErr.Error()) + + // enforced labels, but all labels are present. + req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false) + _, err = distributors[0].Push(ctx, req) + require.NoError(t, err) + + // no enforced labels, so no errors. + limits.EnforcedLabels = []string{} + distributors, _ = prepare(t, 1, 3, limits, nil) + _, err = distributors[0].Push(ctx, req) + require.NoError(t, err) +} + func TestDistributorPushConcurrently(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 23c99367d1..62098dac6d 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -38,6 +38,7 @@ type Limits interface { BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int + EnforcedLabels(userID string) []string IngestionPartitionsTenantShardSize(userID string) int } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 878255d661..5aea652225 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -55,6 +55,7 @@ type validationContext struct { blockIngestionUntil time.Time blockIngestionStatusCode int + enforcedLabels []string userID string } @@ -80,6 +81,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), blockIngestionUntil: v.BlockIngestionUntil(userID), blockIngestionStatusCode: v.BlockIngestionStatusCode(userID), + enforcedLabels: v.EnforcedLabels(userID), } } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index ad84bbc0af..4ecc566e36 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -230,6 +230,7 @@ type Limits struct { BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` + EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` @@ -445,6 +446,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.") f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.") + f.Var((*dskit_flagext.StringSlice)(&l.EnforcedLabels), "validation.enforced-labels", "List of labels that must be present in the stream. If any of the labels are missing, the stream will be discarded. This flag configures it globally for all tenants. Experimental.") f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") @@ -1111,6 +1113,10 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } +func (o *Overrides) EnforcedLabels(userID string) []string { + return o.getOverridesForUser(userID).EnforcedLabels +} + func (o *Overrides) ShardAggregations(userID string) []string { return o.getOverridesForUser(userID).ShardAggregations } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index bb955d8a87..845ec805c5 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -226,7 +226,8 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, }, }, { @@ -245,7 +246,8 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, }, }, { @@ -269,6 +271,7 @@ retention_stream: // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, }, }, { @@ -290,7 +293,8 @@ reject_old_samples: true Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, }, }, { @@ -313,7 +317,8 @@ query_timeout: 5m Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, }, }, } { diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 82a3343d44..ff681ac8d0 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -71,6 +71,8 @@ const ( StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." BlockedIngestion = "blocked_ingestion" BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'" + MissingEnforcedLabels = "missing_enforced_labels" + MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s" ) type ErrStreamRateLimit struct {