diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index e065282ea7..d75d6a4307 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3641,6 +3641,11 @@ otlp_config: # drop them altogether [log_attributes: ] +# Block ingestion for policy until the configured date. The time should be in +# RFC3339 format. The policy is based on the policy_stream_mapping +# configuration. +[block_ingestion_policy_until: ] + # Block ingestion until the configured date. The time should be in RFC3339 # format. # CLI flag: -limits.block-ingestion-until diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d48625843a..27a963424f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID) d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) - validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries))) discardedBytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes)) + d.validator.reportDiscardedData(validation.MissingEnforcedLabels, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries)) + continue + } + + if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block { + d.writeFailuresManager.Log(tenantID, err) + discardedBytes := util.EntriesTotalSize(stream.Entries) + d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries)) + + // If the status code is 200, return success. + // Note that we still log the error and increment the metrics. + if statusCode == http.StatusOK { + // do not add error to validationErrors. + continue + } + + validationErrors.Add(err) continue } @@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return &logproto.PushResponse{}, validationErr } - if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block { - d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion) - - err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode) - d.writeFailuresManager.Log(tenantID, err) - - // If the status code is 200, return success. - // Note that we still log the error and increment the metrics. - if retStatusCode == http.StatusOK { - return &logproto.PushResponse{}, nil - } - - return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error()) - } - if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) { d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 12fe12dbfa..331fe68724 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -17,11 +17,10 @@ import ( otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" - "github.com/grafana/loki/pkg/push" - "github.com/c2h5oh/datasize" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" + dskit_flagext "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/consul" @@ -51,6 +50,8 @@ import ( loki_net "github.com/grafana/loki/v3/pkg/util/net" "github.com/grafana/loki/v3/pkg/util/test" "github.com/grafana/loki/v3/pkg/validation" + + "github.com/grafana/loki/pkg/push" ) const ( @@ -441,6 +442,7 @@ func Test_MissingEnforcedLabels(t *testing.T) { // request with all required labels. lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"}) missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1") + assert.False(t, missing) assert.Empty(t, missingLabels) @@ -462,25 +464,42 @@ func Test_PushWithEnforcedLabels(t *testing.T) { flagext.DefaultValues(limits) // makeWriteRequest only contains a `{foo="bar"}` label. - req := makeWriteRequest(100, 100) + req := makeWriteRequest(100, 100) // 100 lines of 100 bytes each limits.EnforcedLabels = []string{"app", "env"} distributors, _ := prepare(t, 1, 3, limits, nil) + + // reset metrics in case they were set from a previous test. + validation.DiscardedBytes.Reset() + validation.DiscardedSamples.Reset() + // enforced labels configured, but all labels are missing. _, err := distributors[0].Push(ctx, req) require.Error(t, err) expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test") require.EqualError(t, err, expectedErr.Error()) + // Verify metrics for discarded samples due to missing enforced labels + assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) // 100 lines * 100 bytes + assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) // 100 lines + // enforced labels, but all labels are present. req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false) _, err = distributors[0].Push(ctx, req) require.NoError(t, err) + // Metrics should not have increased since this push was successful + assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) + assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) + // no enforced labels, so no errors. limits.EnforcedLabels = []string{} distributors, _ = prepare(t, 1, 3, limits, nil) _, err = distributors[0].Push(ctx, req) require.NoError(t, err) + + // Metrics should remain unchanged + assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) + assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) } func TestDistributorPushConcurrently(t *testing.T) { @@ -1672,7 +1691,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) { if tc.expectError { expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode) require.ErrorContains(t, err, expectedErr) - require.Nil(t, response) + } else { + require.NoError(t, err) + require.Equal(t, success, response) + } + }) + } +} + +func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) { + now := time.Now() + defaultErrCode := 260 + + for _, tc := range []struct { + name string + blockUntil map[string]time.Time + policy string + labels string + expectError bool + expectedErrorMsg string + yes bool + }{ + { + name: "not blocked - no policy block configured", + policy: "test-policy", + labels: `{foo="bar"}`, + expectError: false, + }, + { + name: "not blocked - policy block expired", + blockUntil: map[string]time.Time{ + "test-policy": now.Add(-1 * time.Hour), + }, + policy: "test-policy", + labels: `{foo="bar"}`, + expectError: false, + }, + { + name: "blocked - policy block active", + blockUntil: map[string]time.Time{ + "test-policy": now.Add(1 * time.Hour), + }, + policy: "test-policy", + labels: `{foo="bar"}`, + expectError: true, + expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode), + yes: true, + }, + { + name: "not blocked - different policy", + blockUntil: map[string]time.Time{ + "blocked-policy": now.Add(1 * time.Hour), + }, + policy: "test-policy", + labels: `{foo="bar"}`, + expectError: false, + }, + { + name: "blocked - custom status code", + blockUntil: map[string]time.Time{ + "test-policy": now.Add(1 * time.Hour), + }, + policy: "test-policy", + labels: `{foo="bar"}`, + expectError: true, + expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode), + }, + } { + t.Run(tc.name, func(t *testing.T) { + if !tc.yes { + return + } + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + // Configure policy mapping + limits.PolicyStreamMapping = validation.PolicyStreamMapping{ + tc.policy: []*validation.PriorityStream{ + { + Selector: tc.labels, + Priority: 1, + }, + }, + } + + // Configure policy blocks + if tc.blockUntil != nil { + limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time) + for policy, until := range tc.blockUntil { + limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until) + } + } + + distributors, _ := prepare(t, 1, 3, limits, nil) + request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false) + response, err := distributors[0].Push(ctx, request) + + if tc.expectError { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErrorMsg) } else { require.NoError(t, err) require.Equal(t, success, response) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 90a35ce0d1..127aa97b80 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -39,6 +39,7 @@ type Limits interface { BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int + BlockIngestionPolicyUntil(userID string, policy string) time.Time EnforcedLabels(userID string) []string PolicyEnforcedLabels(userID string, policy string) []string diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 4f99feffb9..083473a5a9 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -104,21 +104,13 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // Makes time string on the error message formatted consistently. formatedEntryTime := entry.Timestamp.Format(timeFormat) formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) } if ts > vCtx.creationGracePeriod { formatedEntryTime := entry.Timestamp.Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) } @@ -127,39 +119,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // an orthogonal concept (we need not use ValidateLabels in this context) // but the upstream cortex_validation pkg uses it, so we keep this // for parity. - validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) } if structuredMetadataCount > 0 { if !vCtx.allowStructuredMetadata { - validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) } if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize) } if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Add(entrySize) - if v.usageTracker != nil { - v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize) - } + v.reportDiscardedDataWithTracker(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize), 1) return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount) } } @@ -168,9 +144,10 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } // Validate labels returns an error if the labels are invalid -func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy string) error { +func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy string) error { if len(ls) == 0 { - validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, retentionHours, policy).Inc() + // TODO: is this one correct? + validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, vCtx.userID, retentionHours, policy).Inc() return fmt.Errorf(validation.MissingLabelsErrorMsg) } @@ -186,21 +163,23 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea numLabelNames-- } - if numLabelNames > ctx.maxLabelNamesPerSeries { - updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream, retentionHours, policy) - return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) + entriesSize := util.EntriesTotalSize(stream.Entries) + + if numLabelNames > vCtx.maxLabelNamesPerSeries { + v.reportDiscardedData(validation.MaxLabelNamesPerSeries, vCtx, retentionHours, policy, entriesSize, len(stream.Entries)) + return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, vCtx.maxLabelNamesPerSeries) } lastLabelName := "" for _, l := range ls { - if len(l.Name) > ctx.maxLabelNameLength { - updateMetrics(validation.LabelNameTooLong, ctx.userID, stream, retentionHours, policy) + if len(l.Name) > vCtx.maxLabelNameLength { + v.reportDiscardedData(validation.LabelNameTooLong, vCtx, retentionHours, policy, entriesSize, len(stream.Entries)) return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) - } else if len(l.Value) > ctx.maxLabelValueLength { - updateMetrics(validation.LabelValueTooLong, ctx.userID, stream, retentionHours, policy) + } else if len(l.Value) > vCtx.maxLabelValueLength { + v.reportDiscardedData(validation.LabelValueTooLong, vCtx, retentionHours, policy, entriesSize, len(stream.Entries)) return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { - updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream, retentionHours, policy) + v.reportDiscardedData(validation.DuplicateLabelNames, vCtx, retentionHours, policy, entriesSize, len(stream.Entries)) return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) } lastLabelName = l.Name @@ -208,17 +187,62 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea return nil } +func (v Validator) reportDiscardedData(reason string, vCtx validationContext, retentionHours string, policy string, entrySize, entryCount int) { + validation.DiscardedSamples.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entryCount)) + validation.DiscardedBytes.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entrySize)) +} + +func (v Validator) reportDiscardedDataWithTracker(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize, entryCount int) { + v.reportDiscardedData(reason, vCtx, retentionHours, policy, entrySize, entryCount) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, reason, labels, float64(entrySize)) + } +} + // ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. -func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) { +func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) { + if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block { + return block, code, reason, err + } + + if block, until, code := v.shouldBlockPolicy(ctx, policy, now); block { + err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, ctx.userID, until.Format(time.RFC3339), code) + return true, code, validation.BlockedIngestionPolicy, err + } + + return false, 0, "", nil +} + +func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) { if ctx.blockIngestionUntil.IsZero() { - return false, time.Time{}, 0 + return false, 0, "", nil } - return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode + if now.Before(ctx.blockIngestionUntil) { + err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) + return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err + } + + return false, 0, "", nil } -func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours, policy string) { - validation.DiscardedSamples.WithLabelValues(reason, userID, retentionHours, policy).Add(float64(len(stream.Entries))) - bytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(reason, userID, retentionHours, policy).Add(float64(bytes)) +// ShouldBlockPolicy checks if ingestion should be blocked for the given policy. +// It returns true if ingestion should be blocked, along with the block until time and status code. +func (v *Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { + // No policy provided, don't block + if policy == "" { + return false, time.Time{}, 0 + } + + // Check if this policy is blocked in tenant configs + blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy) + if blockUntil.IsZero() { + return false, time.Time{}, 0 + } + + if now.Before(blockUntil) { + return true, blockUntil, ctx.blockIngestionStatusCode + } + + return false, time.Time{}, 0 } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 468fde3920..bb2f5c4b24 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -227,11 +227,12 @@ type Limits struct { OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"` GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"` - BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` - BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` - EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` - PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"` - PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"` + BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."` + BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until" category:"experimental"` + BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` + EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` + PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"` + PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"` IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` @@ -1122,6 +1123,18 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } +func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time { + limits := o.getOverridesForUser(userID) + if limits == nil || limits.BlockIngestionPolicyUntil == nil { + return time.Time{} // Zero time means no blocking + } + + if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok { + return time.Time(blockUntil) + } + return time.Time{} // Zero time means no blocking +} + func (o *Overrides) EnforcedLabels(userID string) []string { return o.getOverridesForUser(userID).EnforcedLabels } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 9ecc8937e8..7dcabe65c7 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + dskit_flagext "github.com/grafana/dskit/flagext" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -226,10 +227,11 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, - PolicyEnforcedLabels: map[string][]string{}, - PolicyStreamMapping: PolicyStreamMapping{}, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, + PolicyStreamMapping: PolicyStreamMapping{}, + BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{}, }, }, { @@ -248,10 +250,11 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, - PolicyEnforcedLabels: map[string][]string{}, - PolicyStreamMapping: PolicyStreamMapping{}, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, + PolicyStreamMapping: PolicyStreamMapping{}, + BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{}, }, }, { @@ -273,11 +276,12 @@ retention_stream: }, // Rest from new defaults - RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, - PolicyEnforcedLabels: map[string][]string{}, - PolicyStreamMapping: PolicyStreamMapping{}, + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, + PolicyStreamMapping: PolicyStreamMapping{}, + BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{}, }, }, { @@ -299,10 +303,11 @@ reject_old_samples: true Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, - PolicyEnforcedLabels: map[string][]string{}, - PolicyStreamMapping: PolicyStreamMapping{}, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, + PolicyStreamMapping: PolicyStreamMapping{}, + BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{}, }, }, { @@ -325,10 +330,11 @@ query_timeout: 5m Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, - PolicyEnforcedLabels: map[string][]string{}, - PolicyStreamMapping: PolicyStreamMapping{}, + OTLPConfig: defaultOTLPConfig, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, + PolicyStreamMapping: PolicyStreamMapping{}, + BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{}, }, }, } { diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index e6af919adc..c7c71d285c 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -71,6 +71,8 @@ const ( StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." BlockedIngestion = "blocked_ingestion" BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'" + BlockedIngestionPolicy = "blocked_ingestion_policy" + BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'" MissingEnforcedLabels = "missing_enforced_labels" MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s" )