diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 27a963424f..631106cdb1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -226,7 +226,7 @@ func New( policyResolver := push.PolicyResolver(func(userID string, lbs labels.Labels) string { mappings := overrides.PoliciesStreamMapping(userID) - return mappings.PolicyFor(lbs) + return getPolicy(userID, lbs, mappings, logger) }) validator, err := NewValidator(overrides, usageTracker) @@ -1197,7 +1197,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, mapping := d.validator.Limits.PoliciesStreamMapping(vContext.userID) if val, ok := d.labelCache.Get(key); ok { retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, val.ls) - policy := mapping.PolicyFor(val.ls) + policy := getPolicy(vContext.userID, val.ls, mapping, d.logger) return val.ls, val.ls.String(), val.hash, retentionHours, policy, nil } @@ -1205,10 +1205,10 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, if err != nil { retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, nil) // TODO: check for global policy. - return nil, "", 0, retentionHours, mapping.PolicyFor(nil), fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) + return nil, "", 0, retentionHours, "", fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) } - policy := mapping.PolicyFor(ls) + policy := getPolicy(vContext.userID, ls, mapping, d.logger) retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, ls) if err := d.validator.ValidateLabels(vContext, ls, stream, retentionHours, policy); err != nil { @@ -1304,3 +1304,24 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l func (d *Distributor) HealthyInstancesCount() int { return int(d.healthyInstancesCount.Load()) } + +func getPolicy(userID string, lbs labels.Labels, mapping validation.PolicyStreamMapping, logger log.Logger) string { + policies := mapping.PolicyFor(lbs) + + var policy string + if len(policies) > 0 { + policy = policies[0] + if len(policies) > 1 { + level.Warn(logger).Log( + "msg", "multiple policies matched for the same stream", + "org_id", userID, + "stream", lbs.String(), + "policy", policy, + "policies", strings.Join(policies, ","), + "insight", "true", + ) + } + } + + return policy +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 30257681c9..eedd9fb67d 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "os" + "strings" "sync" "syscall" "time" @@ -298,11 +299,27 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre retentionHours := util.RetentionHours(i.tenantsRetention.RetentionPeriodFor(i.instanceID, labels)) mapping := i.limiter.limits.PoliciesStreamMapping(i.instanceID) - policy := mapping.PolicyFor(labels) + policies := mapping.PolicyFor(labels) if record != nil { err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID) } + // NOTE: We previously resolved the policy on distributors and logged when multiple policies were matched. + // As on distributors, we use the first policy by alphabetical order. + var policy string + if len(policies) > 0 { + policy = policies[0] + if len(policies) > 1 { + level.Warn(util_log.Logger).Log( + "msg", "multiple policies matched for the same stream", + "org_id", i.instanceID, + "stream", pushReqStream.Labels, + "policy", policy, + "policies", strings.Join(policies, ","), + ) + } + } + if err != nil { return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours, policy) } diff --git a/pkg/validation/ingestion_policies.go b/pkg/validation/ingestion_policies.go index 6183125b0b..2a614afaaa 100644 --- a/pkg/validation/ingestion_policies.go +++ b/pkg/validation/ingestion_policies.go @@ -1,6 +1,13 @@ package validation -import "github.com/prometheus/prometheus/model/labels" +import ( + "fmt" + "slices" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/logql/syntax" +) type PriorityStream struct { Priority int `yaml:"priority" json:"priority" doc:"description=The larger the value, the higher the priority."` @@ -19,18 +26,43 @@ func (p *PriorityStream) Matches(lbs labels.Labels) bool { type PolicyStreamMapping map[string][]*PriorityStream -func (p *PolicyStreamMapping) PolicyFor(lbs labels.Labels) string { +func (p *PolicyStreamMapping) Validate() error { + for policyName, policyStreams := range *p { + for idx, policyStream := range policyStreams { + matchers, err := syntax.ParseMatchers(policyStream.Selector, true) + if err != nil { + return fmt.Errorf("invalid labels matchers for policy stream mapping: %w", err) + } + (*p)[policyName][idx].Matchers = matchers + } + + // Sort the mappings by priority. Higher priority mappings come first. + slices.SortFunc(policyStreams, func(a, b *PriorityStream) int { + return b.Priority - a.Priority + }) + } + + return nil +} + +// PolicyFor returns all the policies that matches the given labels with the highest priority. +// Note that this method will return multiple policies if two different policies match the same labels +// with the same priority. +// Returned policies are sorted alphabetically. +// If no policies match, it returns an empty slice. +func (p *PolicyStreamMapping) PolicyFor(lbs labels.Labels) []string { var ( - matchedPolicy *PriorityStream - found bool - matchedPolicyName string + found bool + highestPriority int + matchedPolicies = make(map[string]int, len(*p)) ) for policyName, policyStreams := range *p { for _, policyStream := range policyStreams { - if found && policyStream.Priority <= matchedPolicy.Priority { - // Even if a match occurs it won't have a higher priority than the current matched policy. - continue + // Mappings are sorted by priority (see PolicyStreamMapping.Validate at this file). + // So we can break early if the current policy has a lower priority than the highest priority matched policy. + if found && policyStream.Priority < highestPriority { + break } if !policyStream.Matches(lbs) { @@ -38,10 +70,21 @@ func (p *PolicyStreamMapping) PolicyFor(lbs labels.Labels) string { } found = true - matchedPolicy = policyStream - matchedPolicyName = policyName + highestPriority = policyStream.Priority + matchedPolicies[policyName] = policyStream.Priority } } - return matchedPolicyName + // Stick with only the highest priority policies. + policies := make([]string, 0, len(matchedPolicies)) + for policyName, priority := range matchedPolicies { + if priority == highestPriority { + policies = append(policies, policyName) + } + } + + // Sort the policies alphabetically. + slices.Sort(policies) + + return policies } diff --git a/pkg/validation/ingestion_policies_test.go b/pkg/validation/ingestion_policies_test.go index 591624dfcd..517ccc5490 100644 --- a/pkg/validation/ingestion_policies_test.go +++ b/pkg/validation/ingestion_policies_test.go @@ -89,17 +89,30 @@ func Test_PolicyStreamMapping_PolicyFor(t *testing.T) { }, }, }, + "policy8": []*PriorityStream{ + { + Selector: `{env=~"prod|test"}`, + Priority: 3, + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "env", "prod|test"), + }, + }, + }, } - require.Equal(t, "policy1", mapping.PolicyFor(labels.FromStrings("foo", "bar"))) + require.NoError(t, mapping.Validate()) + + require.Equal(t, []string{"policy1"}, mapping.PolicyFor(labels.FromStrings("foo", "bar"))) // matches both policy2 and policy1 but policy1 has higher priority. - require.Equal(t, "policy1", mapping.PolicyFor(labels.FromStrings("foo", "bar", "daz", "baz"))) + require.Equal(t, []string{"policy1"}, mapping.PolicyFor(labels.FromStrings("foo", "bar", "daz", "baz"))) // matches policy3 and policy4 but policy3 has higher priority.. - require.Equal(t, "policy3", mapping.PolicyFor(labels.FromStrings("qyx", "qzx", "qox", "qox"))) + require.Equal(t, []string{"policy3"}, mapping.PolicyFor(labels.FromStrings("qyx", "qzx", "qox", "qox"))) // matches no policy. - require.Equal(t, "", mapping.PolicyFor(labels.FromStrings("foo", "fooz", "daz", "qux", "quux", "corge"))) + require.Empty(t, mapping.PolicyFor(labels.FromStrings("foo", "fooz", "daz", "qux", "quux", "corge"))) // matches policy5 through regex. - require.Equal(t, "policy5", mapping.PolicyFor(labels.FromStrings("qab", "qzxqox"))) + require.Equal(t, []string{"policy5"}, mapping.PolicyFor(labels.FromStrings("qab", "qzxqox"))) - require.Equal(t, "policy6", mapping.PolicyFor(labels.FromStrings("env", "prod", "team", "finance"))) + require.Equal(t, []string{"policy6"}, mapping.PolicyFor(labels.FromStrings("env", "prod", "team", "finance"))) + // Matches policy7 and policy8 which have the same priority. + require.Equal(t, []string{"policy7", "policy8"}, mapping.PolicyFor(labels.FromStrings("env", "prod"))) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index bb2f5c4b24..726d6a208b 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -516,14 +516,8 @@ func (l *Limits) Validate() error { } if l.PolicyStreamMapping != nil { - for policyName, policyStreams := range l.PolicyStreamMapping { - for idx, policyStream := range policyStreams { - matchers, err := syntax.ParseMatchers(policyStream.Selector, true) - if err != nil { - return fmt.Errorf("invalid labels matchers for policy stream mapping: %w", err) - } - l.PolicyStreamMapping[policyName][idx].Matchers = matchers - } + if err := l.PolicyStreamMapping.Validate(); err != nil { + return err } }