feat: Add support for blocking a policy to be ingested (#16203)

**What this PR does / why we need it**:
Add support for blocking a policy from being ingested.
The policy is defined by stream mapping.
pull/16327/head
Dylan Guedes 3 months ago committed by GitHub
parent 677faadcb5
commit 69089eff41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      docs/sources/shared/configuration.md
  2. 34
      pkg/distributor/distributor.go
  3. 125
      pkg/distributor/distributor_test.go
  4. 1
      pkg/distributor/limits.go
  5. 118
      pkg/distributor/validator.go
  6. 23
      pkg/validation/limits.go
  7. 48
      pkg/validation/limits_test.go
  8. 2
      pkg/validation/validate.go

@ -3641,6 +3641,11 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]
# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
[block_ingestion_policy_until: <map of string to Time>]
# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until

@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
d.validator.reportDiscardedData(validation.MissingEnforcedLabels, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
continue
}
if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {
d.writeFailuresManager.Log(tenantID, err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
continue
}
validationErrors.Add(err)
continue
}
@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}
if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)
err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

@ -17,11 +17,10 @@ import (
otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
"github.com/grafana/loki/pkg/push"
"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
@ -51,6 +50,8 @@ import (
loki_net "github.com/grafana/loki/v3/pkg/util/net"
"github.com/grafana/loki/v3/pkg/util/test"
"github.com/grafana/loki/v3/pkg/validation"
"github.com/grafana/loki/pkg/push"
)
const (
@ -441,6 +442,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")
assert.False(t, missing)
assert.Empty(t, missingLabels)
@ -462,25 +464,42 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
flagext.DefaultValues(limits)
// makeWriteRequest only contains a `{foo="bar"}` label.
req := makeWriteRequest(100, 100)
req := makeWriteRequest(100, 100) // 100 lines of 100 bytes each
limits.EnforcedLabels = []string{"app", "env"}
distributors, _ := prepare(t, 1, 3, limits, nil)
// reset metrics in case they were set from a previous test.
validation.DiscardedBytes.Reset()
validation.DiscardedSamples.Reset()
// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
require.EqualError(t, err, expectedErr.Error())
// Verify metrics for discarded samples due to missing enforced labels
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) // 100 lines * 100 bytes
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) // 100 lines
// enforced labels, but all labels are present.
req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)
// Metrics should not have increased since this push was successful
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
// no enforced labels, so no errors.
limits.EnforcedLabels = []string{}
distributors, _ = prepare(t, 1, 3, limits, nil)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)
// Metrics should remain unchanged
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
}
func TestDistributorPushConcurrently(t *testing.T) {
@ -1672,7 +1691,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}
func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()
defaultErrCode := 260
for _, tc := range []struct {
name string
blockUntil map[string]time.Time
policy string
labels string
expectError bool
expectedErrorMsg string
yes bool
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
yes: true,
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
},
} {
t.Run(tc.name, func(t *testing.T) {
if !tc.yes {
return
}
limits := &validation.Limits{}
flagext.DefaultValues(limits)
// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}
// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}
distributors, _ := prepare(t, 1, 3, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)
if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)

@ -39,6 +39,7 @@ type Limits interface {
BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

@ -104,21 +104,13 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}
if ts > vCtx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}
@ -127,39 +119,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}
if structuredMetadataCount > 0 {
if !vCtx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}
if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize)
}
if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize)
}
v.reportDiscardedDataWithTracker(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize), 1)
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount)
}
}
@ -168,9 +144,10 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
}
// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy string) error {
func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy string) error {
if len(ls) == 0 {
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, retentionHours, policy).Inc()
// TODO: is this one correct?
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, vCtx.userID, retentionHours, policy).Inc()
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}
@ -186,21 +163,23 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
numLabelNames--
}
if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream, retentionHours, policy)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
entriesSize := util.EntriesTotalSize(stream.Entries)
if numLabelNames > vCtx.maxLabelNamesPerSeries {
v.reportDiscardedData(validation.MaxLabelNamesPerSeries, vCtx, retentionHours, policy, entriesSize, len(stream.Entries))
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, vCtx.maxLabelNamesPerSeries)
}
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream, retentionHours, policy)
if len(l.Name) > vCtx.maxLabelNameLength {
v.reportDiscardedData(validation.LabelNameTooLong, vCtx, retentionHours, policy, entriesSize, len(stream.Entries))
return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream, retentionHours, policy)
} else if len(l.Value) > vCtx.maxLabelValueLength {
v.reportDiscardedData(validation.LabelValueTooLong, vCtx, retentionHours, policy, entriesSize, len(stream.Entries))
return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream, retentionHours, policy)
v.reportDiscardedData(validation.DuplicateLabelNames, vCtx, retentionHours, policy, entriesSize, len(stream.Entries))
return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
@ -208,17 +187,62 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
return nil
}
func (v Validator) reportDiscardedData(reason string, vCtx validationContext, retentionHours string, policy string, entrySize, entryCount int) {
validation.DiscardedSamples.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entryCount))
validation.DiscardedBytes.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entrySize))
}
func (v Validator) reportDiscardedDataWithTracker(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize, entryCount int) {
v.reportDiscardedData(reason, vCtx, retentionHours, policy, entrySize, entryCount)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, reason, labels, float64(entrySize))
}
}
// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) {
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) {
if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block {
return block, code, reason, err
}
if block, until, code := v.shouldBlockPolicy(ctx, policy, now); block {
err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, ctx.userID, until.Format(time.RFC3339), code)
return true, code, validation.BlockedIngestionPolicy, err
}
return false, 0, "", nil
}
func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) {
if ctx.blockIngestionUntil.IsZero() {
return false, time.Time{}, 0
return false, 0, "", nil
}
return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
if now.Before(ctx.blockIngestionUntil) {
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode)
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err
}
return false, 0, "", nil
}
func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours, policy string) {
validation.DiscardedSamples.WithLabelValues(reason, userID, retentionHours, policy).Add(float64(len(stream.Entries)))
bytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(reason, userID, retentionHours, policy).Add(float64(bytes))
// ShouldBlockPolicy checks if ingestion should be blocked for the given policy.
// It returns true if ingestion should be blocked, along with the block until time and status code.
func (v *Validator) shouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) {
// No policy provided, don't block
if policy == "" {
return false, time.Time{}, 0
}
// Check if this policy is blocked in tenant configs
blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy)
if blockUntil.IsZero() {
return false, time.Time{}, 0
}
if now.Before(blockUntil) {
return true, blockUntil, ctx.blockIngestionStatusCode
}
return false, time.Time{}, 0
}

@ -227,11 +227,12 @@ type Limits struct {
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until" category:"experimental"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`
IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`
@ -1122,6 +1123,18 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}
func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time {
limits := o.getOverridesForUser(userID)
if limits == nil || limits.BlockIngestionPolicyUntil == nil {
return time.Time{} // Zero time means no blocking
}
if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(blockUntil)
}
return time.Time{} // Zero time means no blocking
}
func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}

@ -7,6 +7,7 @@ import (
"testing"
"time"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -226,10 +227,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
@ -248,10 +250,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
@ -273,11 +276,12 @@ retention_stream:
},
// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
@ -299,10 +303,11 @@ reject_old_samples: true
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
@ -325,10 +330,11 @@ query_timeout: 5m
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
} {

@ -71,6 +71,8 @@ const (
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
BlockedIngestionPolicy = "blocked_ingestion_policy"
BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
)

Loading…
Cancel
Save