Ruler: implement rule-based and shuffle sharding (#8092)

pull/8288/head
Danny Kopping 3 years ago committed by GitHub
parent 078a040794
commit ebfeba5de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 13
      docs/sources/configuration/_index.md
  3. 11
      docs/sources/operations/recording-rules.md
  4. 12
      pkg/ruler/base/manager.go
  5. 21
      pkg/ruler/base/manager_metrics.go
  6. 69
      pkg/ruler/base/manager_metrics_test.go
  7. 236
      pkg/ruler/base/ruler.go
  8. 468
      pkg/ruler/base/ruler_test.go
  9. 2
      pkg/ruler/base/store_mock_test.go
  10. 62
      pkg/util/metrics_helper.go
  11. 26
      pkg/util/metrics_helper_test.go
  12. 12
      pkg/util/shard.go
  13. 5
      pkg/validation/limits.go
  14. 2
      tools/doc-generator/main.go

@ -25,6 +25,7 @@
* [7978](https://github.com/grafana/loki/pull/7978) **chaudum**: Shut down query frontend gracefully to allow inflight requests to complete.
* [8047](https://github.com/grafana/loki/pull/8047) **bboreham**: Dashboards: add k8s resource requests to CPU and memory panels.
* [8061](https://github.com/grafana/loki/pull/8061) **kavirajk**: Remove circle from Loki OSS
* [8092](https://github.com/grafana/loki/pull/8092) **dannykopping**: add rule-based sharding to ruler.
* [8131](https://github.com/grafana/loki/pull/8131) **jeschkies**: Compile Promtail ARM and ARM64 with journald support.
* [8212](https://github.com/grafana/loki/pull/8212) **kavirajk**: ingester: Add `ingester_memory_streams_labels_bytes metric` for more visibility of size of metadata of in-memory streams.
* [8271](https://github.com/grafana/loki/pull/8271) **kavirajk**: logql: Support urlencode and urldecode template functions

@ -7,7 +7,7 @@ weight: 500
# Grafana Loki configuration parameters
<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template -->
<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template, regenerate with `make doc` from root directory. -->
Grafana Loki is configured in a YAML file (usually referred to as `loki.yaml` )
which contains information on the Loki server and its individual components,
@ -935,6 +935,11 @@ alertmanager_client:
# CLI flag: -ruler.sharding-strategy
[sharding_strategy: <string> | default = "default"]
# The sharding algorithm to use for deciding how rules & groups are sharded.
# Supported values are: by-group, by-rule.
# CLI flag: -ruler.sharding-algo
[sharding_algo: <string> | default = "by-group"]
# Time to spend searching for a pending ruler when shutting down.
# CLI flag: -ruler.search-pending-for
[search_pending_for: <duration> | default = 5m]
@ -2230,9 +2235,9 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -ruler.max-rule-groups-per-tenant
[ruler_max_rule_groups_per_tenant: <int> | default = 0]
# The default tenant's shard size when the shuffle-sharding strategy is used by
# ruler. When this setting is specified in the per-tenant overrides, a value of
# 0 disables shuffle sharding for the tenant.
# The default tenant's shard size when shuffle-sharding is enabled in the ruler.
# When this setting is specified in the per-tenant overrides, a value of 0
# disables shuffle sharding for the tenant.
# CLI flag: -ruler.tenant-shard-size
[ruler_tenant_shard_size: <int> | default = 0]

@ -1,13 +1,13 @@
---
title: Recording Rules
description: Recording Rules
description: Working with recording rules.
---
# Recording Rules
Recording rules are evaluated by the `ruler` component. Each `ruler` acts as its own `querier`, in the sense that it
executes queries against the store without using the `query-frontend` or `querier` components. It will respect all query
[limits](/docs/loki/latest/configuration/#limits_config) put in place for the `querier`.
[limits]({{< relref "../configuration/#limits_config" >}}) put in place for the `querier`.
Loki's implementation of recording rules largely reuses Prometheus' code.
@ -79,6 +79,13 @@ Remote-write can be tuned if the default configuration is insufficient (see [Fai
There is a [guide](https://prometheus.io/docs/practices/remote_write/) on the Prometheus website, all of which applies to Loki, too.
Rules can be evenly distributed across available rulers by using `-ruler.enable-sharding=true` and `-ruler.sharding-strategy="by-rule"`.
Rule groups execute in order; this is a feature inherited from Prometheus' rule engine (which Loki uses), but Loki has no
need for this constraint because rules cannot depend on each other. The default sharding strategy will shard by rule groups,
but this may be undesirable as some rule groups could contain more expensive rules, which can lead to subsequent rules missing evaluations.
The `by-rule` sharding strategy creates one rule group for each rule the ruler instance "owns" (based on its hash ring), and these rings
are all executed concurrently.
## Observability
Since Loki reuses the Prometheus code for recording rules and WALs, it also gains all of Prometheus' observability.

@ -49,7 +49,17 @@ type DefaultMultiTenantManager struct {
}
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, limits RulesLimits) (*DefaultMultiTenantManager, error) {
userManagerMetrics := NewManagerMetrics(cfg.DisableRuleGroupLabel)
userManagerMetrics := NewManagerMetrics(cfg.DisableRuleGroupLabel, func(k, v string) string {
// When "by-rule" sharding is enabled, each rule group is assigned a unique name to work around some of Prometheus'
// assumptions, and metrics are exported based on these rule group names. If we kept these unique rule group names
// in place, this would explode cardinality.
if k == RuleGroupLabel {
return RemoveRuleTokenFromGroupName(v)
}
return v
})
if reg != nil {
reg.MustRegister(userManagerMetrics)
}

@ -9,8 +9,9 @@ import (
// ManagerMetrics aggregates metrics exported by the Prometheus
// rules package and returns them as Cortex metrics
type ManagerMetrics struct {
regs *util.UserRegistries
disableRuleGroupLabel bool
regs *util.UserRegistries
disableRuleGroupLabel bool
metricLabelTransformer util.MetricLabelTransformFunc
EvalDuration *prometheus.Desc
IterationDuration *prometheus.Desc
@ -25,15 +26,19 @@ type ManagerMetrics struct {
GroupLastEvalSamples *prometheus.Desc
}
// RuleGroupLabel is the label added by Prometheus, the value of which comes from the GroupKey function
const RuleGroupLabel = "rule_group"
// NewManagerMetrics returns a ManagerMetrics struct
func NewManagerMetrics(disableRuleGroupLabel bool) *ManagerMetrics {
func NewManagerMetrics(disableRuleGroupLabel bool, tf util.MetricLabelTransformFunc) *ManagerMetrics {
commonLabels := []string{"user"}
if !disableRuleGroupLabel {
commonLabels = append(commonLabels, "rule_group")
commonLabels = append(commonLabels, RuleGroupLabel)
}
return &ManagerMetrics{
regs: util.NewUserRegistries(),
disableRuleGroupLabel: disableRuleGroupLabel,
regs: util.NewUserRegistries(),
disableRuleGroupLabel: disableRuleGroupLabel,
metricLabelTransformer: tf,
EvalDuration: prometheus.NewDesc(
"cortex_prometheus_rule_evaluation_duration_seconds",
@ -131,10 +136,10 @@ func (m *ManagerMetrics) Describe(out chan<- *prometheus.Desc) {
// Collect implements the Collector interface
func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data := m.regs.BuildMetricFamiliesPerUser()
data := m.regs.BuildMetricFamiliesPerUser(m.metricLabelTransformer)
labels := []string{}
if !m.disableRuleGroupLabel {
labels = append(labels, "rule_group")
labels = append(labels, RuleGroupLabel)
}
// WARNING: It is important that all metrics generated in this method are "Per User".
// Thanks to that we can actually *remove* metrics for given user (see RemoveUserRegistry).

@ -8,14 +8,18 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/ruler/rulespb"
)
func TestManagerMetricsWithRuleGroupLabel(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
managerMetrics := NewManagerMetrics(false)
managerMetrics := NewManagerMetrics(false, nil)
mainReg.MustRegister(managerMetrics)
managerMetrics.AddUserRegistry("user1", populateManager(1))
managerMetrics.AddUserRegistry("user2", populateManager(10))
@ -137,7 +141,7 @@ cortex_prometheus_rule_group_rules{rule_group="group_two",user="user3"} 100000
func TestManagerMetricsWithoutRuleGroupLabel(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
managerMetrics := NewManagerMetrics(true)
managerMetrics := NewManagerMetrics(true, nil)
mainReg.MustRegister(managerMetrics)
managerMetrics.AddUserRegistry("user1", populateManager(1))
managerMetrics.AddUserRegistry("user2", populateManager(10))
@ -363,7 +367,7 @@ func newGroupMetrics(r prometheus.Registerer) *groupMetrics {
func TestMetricsArePerUser(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
managerMetrics := NewManagerMetrics(true)
managerMetrics := NewManagerMetrics(true, nil)
mainReg.MustRegister(managerMetrics)
managerMetrics.AddUserRegistry("user1", populateManager(1))
managerMetrics.AddUserRegistry("user2", populateManager(10))
@ -402,3 +406,62 @@ func TestMetricsArePerUser(t *testing.T) {
assert.True(t, foundUserLabel, "user label not found for metric %s", desc.String())
}
}
func TestMetricLabelTransformer(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
managerMetrics := NewManagerMetrics(false, func(k, v string) string {
if k == RuleGroupLabel {
return RemoveRuleTokenFromGroupName(v)
}
return v
})
mainReg.MustRegister(managerMetrics)
reg := prometheus.NewRegistry()
metrics := newGroupMetrics(reg)
r := rulespb.RuleDesc{
Alert: "MyAlert",
Expr: "count({foo=\"bar\"}) > 0",
Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "bar")),
}
const ruleGroupName = "my_rule_group"
gr := rulespb.RuleGroupDesc{
Name: ruleGroupName,
Namespace: "namespace",
Rules: []*rulespb.RuleDesc{&r},
}
metrics.iterationsScheduled.WithLabelValues(AddRuleTokenToGroupName(&gr, &r)).Add(1)
managerMetrics.AddUserRegistry("user1", reg)
ch := make(chan prometheus.Metric)
defer func() {
// drain the channel, so that collecting gouroutine can stop.
// This is useful if test fails.
for range ch {
}
}()
go func() {
managerMetrics.Collect(ch)
close(ch)
}()
for m := range ch {
dtoM := &dto.Metric{}
err := m.Write(dtoM)
require.NoError(t, err)
for _, l := range dtoM.Label {
if l.GetName() == RuleGroupLabel {
// if the value has been capitalised, we know it was processed by the label transformer
assert.Equal(t, l.GetValue(), ruleGroupName)
}
}
}
}

@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
@ -39,11 +40,11 @@ import (
"github.com/grafana/loki/pkg/ruler/rulestore"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
var (
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
supportedShardingAlgos = []string{util.ShardingAlgoByGroup, util.ShardingAlgoByRule}
// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
@ -105,6 +106,7 @@ type Config struct {
// Enable sharding rule groups.
EnableSharding bool `yaml:"enable_sharding"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingAlgo string `yaml:"sharding_algo"`
SearchPendingFor time.Duration `yaml:"search_pending_for"`
Ring RingConfig `yaml:"ring" doc:"description=Ring used by Loki ruler. The CLI flags prefix for this block configuration is 'ruler.ring'."`
FlushCheckPeriod time.Duration `yaml:"flush_period"`
@ -121,21 +123,25 @@ type Config struct {
}
// Validate config and returns error on failure
func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
return errInvalidShardingStrategy
func (cfg *Config) Validate(_ log.Logger) error {
if cfg.ShardingStrategy == "" {
cfg.ShardingStrategy = util.ShardingStrategyDefault
}
if cfg.ShardingStrategy == util.ShardingStrategyShuffle && limits.RulerTenantShardSize <= 0 {
return errInvalidTenantShardSize
// whether using shuffle-sharding or not, by-group sharding algorithm is always applied by default
if cfg.ShardingAlgo == "" {
cfg.ShardingAlgo = util.ShardingAlgoByGroup
}
if err := cfg.StoreConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
if !util.StringsContain(supportedShardingAlgos, cfg.ShardingAlgo) {
return fmt.Errorf("invalid sharding algorithm %q, supported algorithms are %v", cfg.ShardingAlgo, supportedShardingAlgos)
}
if err := cfg.ClientTLSConfig.Validate(log); err != nil {
return errors.Wrap(err, "invalid ruler gRPC client config")
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
return fmt.Errorf("invalid sharding strategy %q, supported strategies are %v", cfg.ShardingStrategy, supportedShardingStrategies)
}
return nil
}
@ -170,6 +176,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.SearchPendingFor, "ruler.search-pending-for", 5*time.Minute, "Time to spend searching for a pending ruler when shutting down.")
f.BoolVar(&cfg.EnableSharding, "ruler.enable-sharding", false, "Distribute rule evaluation using ring backend.")
f.StringVar(&cfg.ShardingStrategy, "ruler.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.StringVar(&cfg.ShardingAlgo, "ruler.sharding-algo", util.ShardingAlgoByGroup, fmt.Sprintf("The sharding algorithm to use for deciding how rules & groups are sharded. Supported values are: %s.", strings.Join(supportedShardingAlgos, ", ")))
f.DurationVar(&cfg.FlushCheckPeriod, "ruler.flush-period", 1*time.Minute, "Period with which to attempt to flush rule groups.")
f.StringVar(&cfg.RulePath, "ruler.rule-path", "/rules", "File path to store temporary rule files.")
f.BoolVar(&cfg.EnableAPI, "experimental.ruler.enable-api", false, "Enable the ruler API.")
@ -257,6 +264,10 @@ func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
}
func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
if err := cfg.Validate(logger); err != nil {
return nil, fmt.Errorf("invalid ruler config: %w", err)
}
ruler := &Ruler{
cfg: cfg,
store: ruleStore,
@ -411,6 +422,37 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
return ringHasher.Sum32()
}
func tokenForRule(g *rulespb.RuleGroupDesc, r *rulespb.RuleDesc) uint32 {
ringHasher := fnv.New32a()
// Hasher never returns err.
_, _ = ringHasher.Write([]byte(g.User))
_, _ = ringHasher.Write(sep)
_, _ = ringHasher.Write([]byte(g.Namespace))
_, _ = ringHasher.Write(sep)
_, _ = ringHasher.Write([]byte(g.Name))
_, _ = ringHasher.Write(sep)
_, _ = ringHasher.Write([]byte(getRuleIdentifier(r)))
_, _ = ringHasher.Write(sep)
_, _ = ringHasher.Write([]byte(r.Expr))
_, _ = ringHasher.Write(sep)
_, _ = ringHasher.Write([]byte(logproto.FromLabelAdaptersToLabels(r.Labels).String()))
return ringHasher.Sum32()
}
func getRuleIdentifier(r *rulespb.RuleDesc) string {
if r == nil {
return ""
}
if r.Alert == "" {
return r.Record
}
return r.Alert
}
func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAddr string) (bool, error) {
hash := tokenForGroup(g)
@ -422,6 +464,17 @@ func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, instanceAd
return rlrs.Instances[0].Addr == instanceAddr, nil
}
func instanceOwnsRule(r ring.ReadRing, rg *rulespb.RuleGroupDesc, rd *rulespb.RuleDesc, instanceAddr string) (bool, error) {
hash := tokenForRule(rg, rd)
rlrs, err := r.Get(hash, RingOp, nil, nil, nil)
if err != nil {
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
}
return rlrs.Instances[0].Addr == instanceAddr, nil
}
func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if r.cfg.EnableSharding {
r.ring.ServeHTTP(w, req)
@ -501,18 +554,10 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
}
func (r *Ruler) listRules(ctx context.Context) (result map[string]rulespb.RuleGroupList, err error) {
switch {
case !r.cfg.EnableSharding:
if r.cfg.EnableSharding {
result, err = r.listRulesSharding(ctx)
} else {
result, err = r.listRulesNoSharding(ctx)
case r.cfg.ShardingStrategy == util.ShardingStrategyDefault:
result, err = r.listRulesShardingDefault(ctx)
case r.cfg.ShardingStrategy == util.ShardingStrategyShuffle:
result, err = r.listRulesShuffleSharding(ctx)
default:
return nil, errors.New("invalid sharding configuration")
}
if err != nil {
@ -532,6 +577,14 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
return r.store.ListAllRuleGroups(ctx)
}
func (r *Ruler) listRulesSharding(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
if r.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
return r.listRulesShuffleSharding(ctx)
}
return r.listRulesShardingDefault(ctx)
}
func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulespb.RuleGroupList, error) {
configs, err := r.store.ListAllRuleGroups(ctx)
if err != nil {
@ -540,7 +593,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
filteredConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
filtered := filterRuleGroups(userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRules(r.cfg.ShardingAlgo, userID, groups, r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) > 0 {
filteredConfigs[userID] = filtered
}
@ -598,7 +651,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
filtered := filterRuleGroups(userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
filtered := filterRules(r.cfg.ShardingAlgo, userID, groups, userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) == 0 {
continue
}
@ -615,33 +668,101 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
return result, err
}
// filterRuleGroups returns map of rule groups that given instance "owns" based on supplied ring.
// This function only uses User, Namespace, and Name fields of individual RuleGroups.
//
// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring,
// but only ring passed as parameter.
func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
var result []*rulespb.RuleGroupDesc
// filterRules returns a list of rule groups, each with a single rule, ONLY for rules that are "owned" by the given ring instance.
// the reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, but only ring passed as parameter.
func filterRules(shardingAlgo string, userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing, instanceAddr string, logger log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
// Return one rule group per rule that is owned by this ring instance.
// One rule group is returned per rule because Prometheus executed rule groups concurrently but rules within a group sequentially;
// we are sharding by rule here to explicitly *avoid* sequential execution since Loki does not need this.
var result = make([]*rulespb.RuleGroupDesc, 0, len(ruleGroups))
for _, g := range ruleGroups {
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
if err != nil {
ringCheckErrors.Inc()
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
logger = log.With(logger, "user", userID, "namespace", g.Namespace, "group", g.Name)
switch shardingAlgo {
// if we are sharding by rule group, we can just add the entire group
case util.ShardingAlgoByGroup:
owned, err := instanceOwnsRuleGroup(ring, g, instanceAddr)
if err != nil {
ringCheckErrors.Inc()
level.Error(logger).Log("msg", "failed to check if the ruler replica owns the rule group", "err", err)
continue
}
if owned {
level.Debug(logger).Log("msg", "rule group owned")
result = append(result, g)
} else {
level.Debug(logger).Log("msg", "rule group not owned, ignoring")
}
continue
}
if owned {
level.Debug(log).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name)
result = append(result, g)
} else {
level.Debug(log).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name)
// if we are sharding by rule, we need to create rule groups for each rule to comply with Prometheus' rule engine's expectations
case util.ShardingAlgoByRule:
for _, r := range g.Rules {
rlog := log.With(logger, "rule", getRuleIdentifier(r))
owned, err := instanceOwnsRule(ring, g, r, instanceAddr)
if err != nil {
ringCheckErrors.Inc()
level.Error(rlog).Log("msg", "failed to check if the ruler replica owns the rule", "err", err)
continue
}
if !owned {
level.Debug(rlog).Log("msg", "rule not owned, ignoring")
continue
}
level.Debug(rlog).Log("msg", "rule owned")
// clone the group and replace the rules
clone := cloneGroupWithRule(g, r)
if clone == nil {
level.Error(rlog).Log("msg", "failed to filter rules", "err", "failed to clone rule group; type coercion failed")
continue
}
result = append(result, clone)
}
}
}
return result
}
func cloneGroupWithRule(g *rulespb.RuleGroupDesc, r *rulespb.RuleDesc) *rulespb.RuleGroupDesc {
clone, ok := proto.Clone(g).(*rulespb.RuleGroupDesc)
if !ok {
return nil
}
// Prometheus relies on group names being unique, and this assumption is very deeply baked in
// so we append the rule token to make each group name unique.
// TODO(dannyk) this is all quite hacky, and we shoud look at forking Prometheus' rule evaluation engine at some point.
clone.Name = AddRuleTokenToGroupName(g, r)
clone.Rules = []*rulespb.RuleDesc{r}
return clone
}
// the delimiter is prefixed with ";" since that is what Prometheus uses for its group key
const ruleTokenDelimiter = ";rule-shard-token"
// AddRuleTokenToGroupName adds a rule shard token to a given group's name to make it unique.
// Only relevant when using "by-rule" sharding strategy.
func AddRuleTokenToGroupName(g *rulespb.RuleGroupDesc, r *rulespb.RuleDesc) string {
return fmt.Sprintf("%s"+ruleTokenDelimiter+"%d", g.Name, tokenForRule(g, r))
}
// RemoveRuleTokenFromGroupName removes the rule shard token from the group name.
// Only relevant when using "by-rule" sharding strategy.
func RemoveRuleTokenFromGroupName(name string) string {
return strings.Split(name, ruleTokenDelimiter)[0]
}
// GetRules retrieves the running rules from this ruler and all running rulers in the ring if
// sharding is enabled
func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) {
@ -763,7 +884,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta
var (
mergedMx sync.Mutex
merged []*GroupStateDesc
merged = make(map[string]*GroupStateDesc)
)
// Concurrently fetch rules from all rulers. Since rules are not replicated,
@ -778,18 +899,41 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta
}
newGrps, err := rulerClient.Rules(ctx, &RulesRequest{})
if err != nil {
return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr)
if err != nil || newGrps == nil {
return fmt.Errorf("unable to retrieve rules from ruler %s: %w", addr, err)
}
mergedMx.Lock()
merged = append(merged, newGrps.Groups...)
for _, grp := range newGrps.Groups {
if grp == nil {
continue
}
name := RemoveRuleTokenFromGroupName(grp.Group.Name)
grp.Group.Name = name
_, found := merged[name]
if found {
merged[name].ActiveRules = append(merged[name].ActiveRules, grp.ActiveRules...)
} else {
merged[name] = grp
}
}
mergedMx.Unlock()
return nil
})
return merged, err
mergedMx.Lock()
descs := make([]*GroupStateDesc, 0, len(merged))
for _, desc := range merged {
descs = append(descs, desc)
}
mergedMx.Unlock()
return descs, err
}
// Rules implements the rules service

@ -202,6 +202,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer
}
func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetrics loki_storage.ClientMetrics, rulerAddrMap map[string]*Ruler) *Ruler {
// validate here instead of newRuler because it is used elsewhere
require.NoError(t, rulerConfig.Validate(log.NewNopLogger()))
engine, queryable, pusher, logger, overrides, reg := testSetup(t, q)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)
@ -394,6 +397,7 @@ func TestGetRules(t *testing.T) {
type testCase struct {
sharding bool
shardingStrategy string
shardingAlgo string
shuffleShardSize int
}
@ -432,8 +436,7 @@ func TestGetRules(t *testing.T) {
sharding: false,
},
"Default Sharding": {
sharding: true,
shardingStrategy: util.ShardingStrategyDefault,
sharding: true,
},
"Shuffle Sharding and ShardSize = 2": {
sharding: true,
@ -456,6 +459,7 @@ func TestGetRules(t *testing.T) {
cfg.ShardingStrategy = tc.shardingStrategy
cfg.EnableSharding = tc.sharding
cfg.ShardingAlgo = tc.shardingAlgo
cfg.Ring = RingConfig{
InstanceID: id,
@ -561,10 +565,22 @@ func TestSharding(t *testing.T) {
user3 = "user3"
)
user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "first"}
user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "second"}
user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace", Name: "first"}
user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace", Name: "first"}
user1Group1Rule1 := &rulespb.RuleDesc{Record: "user1_group1_rule1", Expr: "1", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("rule", "1"))}
user1Group1Rule2 := &rulespb.RuleDesc{Record: "user1_group1_rule2", Expr: "2", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("rule", "2"))}
user1Group2Rule1 := &rulespb.RuleDesc{Record: "user1_group2_rule1", Expr: "1", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("rule", "1"))}
user1Group1 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{user1Group1Rule1, user1Group1Rule2}}
user1Group2 := &rulespb.RuleGroupDesc{User: user1, Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{user1Group2Rule1}}
user2Group1Rule1 := &rulespb.RuleDesc{Alert: "User2Group1Alert1", Expr: "1", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("alert", "1"))}
user2Group1Rule2 := &rulespb.RuleDesc{Record: "user2_group1_rule2", Expr: "2", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("rule", "2"))}
user2Group1 := &rulespb.RuleGroupDesc{User: user2, Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{user2Group1Rule1, user2Group1Rule2}}
user3Group1Rule1 := &rulespb.RuleDesc{Alert: "User3Group1Alert1", Expr: "1", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("alert", "1"))}
user3Group1Rule2 := &rulespb.RuleDesc{Alert: "User3Group1Alert2", Expr: "2", Labels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("alert", "2"))}
user3Group1 := &rulespb.RuleGroupDesc{User: user3, Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{user3Group1Rule1, user3Group1Rule2}}
// Must be distinct for test to work.
user1Group1Token := tokenForGroup(user1Group1)
@ -572,18 +588,42 @@ func TestSharding(t *testing.T) {
user2Group1Token := tokenForGroup(user2Group1)
user3Group1Token := tokenForGroup(user3Group1)
user1Group1Rule1Token := tokenForRule(user1Group1, user1Group1Rule1)
user1Group1Rule2Token := tokenForRule(user1Group1, user1Group1Rule2)
user1Group2Rule1Token := tokenForRule(user1Group2, user1Group2Rule1)
user2Group1Rule1Token := tokenForRule(user2Group1, user2Group1Rule1)
user2Group1Rule2Token := tokenForRule(user2Group1, user2Group1Rule2)
user3Group1Rule1Token := tokenForRule(user3Group1, user3Group1Rule1)
user3Group1Rule2Token := tokenForRule(user3Group1, user3Group1Rule2)
noRules := map[string]rulespb.RuleGroupList{}
allRules := map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
user2: {user2Group1},
user3: {user3Group1},
}
allRulesSharded := map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
}
// ruler ID -> (user ID -> list of groups).
type expectedRulesMap map[string]map[string]rulespb.RuleGroupList
type testCase struct {
sharding bool
shardingAlgo string
shardingStrategy string
shuffleShardSize int
setupRing func(*ring.Desc)
@ -942,6 +982,409 @@ func TestSharding(t *testing.T) {
},
},
},
"sharding by rule, single ruler": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{ruler1: allRulesSharded},
},
"sharding by rule, single ruler, single enabled user": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
}},
},
"sharding by rule, single ruler, single disabled user": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
}},
},
"sharding by rule, multiple ACTIVE rulers": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
},
user2: {cloneGroupWithRule(user2Group1, user2Group1Rule2)},
},
ruler2: map[string]rulespb.RuleGroupList{
user1: {cloneGroupWithRule(user1Group2, user1Group2Rule1)},
user2: {cloneGroupWithRule(user2Group1, user2Group1Rule1)},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
"sharding by rule, multiple ACTIVE rulers, single enabled user": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
},
},
ruler2: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
},
},
},
"sharding by rule, multiple ACTIVE rulers, single disabled user": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
},
ruler2: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
"sharding by rule, unhealthy ACTIVE ruler": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.Ingesters[ruler2] = ring.InstanceDesc{
Addr: ruler2Addr,
Timestamp: time.Now().Add(-time.Hour).Unix(),
State: ring.ACTIVE,
Tokens: sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}),
}
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
},
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
},
// This ruler doesn't get rules from unhealthy ruler (RF=1).
ruler2: noRules,
},
},
"sharding by rule, LEAVING ruler": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.LEAVING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
// LEAVING ruler doesn't get any rules.
ruler1: noRules,
ruler2: allRulesSharded,
},
},
"sharding by rule, JOINING ruler": {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.JOINING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
// JOINING ruler has no rules yet.
ruler1: noRules,
ruler2: allRulesSharded,
},
},
"shuffle sharding with 'by-rule' strategy, single ruler": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: allRulesSharded,
},
},
"shuffle sharding with 'by-rule' strategy, multiple rulers, shard size 1": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
// immaterial what tokens this ruler has, it won't be assigned any rules
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: allRulesSharded,
ruler2: noRules,
},
},
// Same test as previous one, but with shard size=2. Second ruler gets all the rules.
"shuffle sharding with 'by-rule' strategy, two rulers, shard size 2": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
// Exact same tokens setup as previous test.
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
// this ruler has all the rule tokens, so it gets all the rules
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: noRules,
ruler2: allRulesSharded,
},
},
"shuffle sharding with 'by-rule' strategy, two rulers, shard size 1, distributed users": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
},
ruler2: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
"shuffle sharding with 'by-rule' strategy, three rulers, shard size 2": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Rule1Token + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group1Rule2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
},
ruler2: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule2),
},
},
ruler3: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
"shuffle sharding with 'by-rule' strategy, three rulers, shard size 2, ruler2 has no users": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
},
},
ruler2: noRules, // Ruler2 owns token for user2group1, but user-2 will only be handled by ruler-1 and 3.
ruler3: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
"shuffle sharding with 'by-rule' strategy, three rulers, shard size 2, single enabled user": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 2,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group1, user1Group1Rule1),
cloneGroupWithRule(user1Group1, user1Group1Rule2),
},
},
ruler2: map[string]rulespb.RuleGroupList{
user1: {
cloneGroupWithRule(user1Group2, user1Group2Rule1),
},
},
ruler3: map[string]rulespb.RuleGroupList{},
},
},
"shuffle sharding with 'by-rule' strategy, three rulers, shard size 2, single disabled user": {
sharding: true,
shardingStrategy: util.ShardingStrategyShuffle,
shardingAlgo: util.ShardingAlgoByRule,
shuffleShardSize: 2,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
},
expectedRules: expectedRulesMap{
ruler1: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule1),
},
},
ruler2: map[string]rulespb.RuleGroupList{},
ruler3: map[string]rulespb.RuleGroupList{
user2: {
cloneGroupWithRule(user2Group1, user2Group1Rule2),
},
user3: {
cloneGroupWithRule(user3Group1, user3Group1Rule1),
cloneGroupWithRule(user3Group1, user3Group1Rule2),
},
},
},
},
}
for name, tc := range testCases {
@ -954,6 +1397,7 @@ func TestSharding(t *testing.T) {
StoreConfig: RuleStoreConfig{mock: newMockRuleStore(allRules)},
EnableSharding: tc.sharding,
ShardingStrategy: tc.shardingStrategy,
ShardingAlgo: tc.shardingAlgo,
Ring: RingConfig{
InstanceID: id,
InstanceAddr: host,
@ -1016,11 +1460,11 @@ func TestSharding(t *testing.T) {
loadedRules1, err := r1.listRules(context.Background())
require.NoError(t, err)
expected := expectedRulesMap{
actual := expectedRulesMap{
ruler1: loadedRules1,
}
addToExpected := func(id string, r *Ruler) {
addToActual := func(id string, r *Ruler) {
// Only expect rules from other rulers when using ring, and they are present in the ring.
if r != nil && rulerRing != nil && rulerRing.HasInstance(id) {
loaded, err := r.listRules(context.Background())
@ -1029,14 +1473,14 @@ func TestSharding(t *testing.T) {
if loaded == nil {
loaded = map[string]rulespb.RuleGroupList{}
}
expected[id] = loaded
actual[id] = loaded
}
}
addToExpected(ruler2, r2)
addToExpected(ruler3, r3)
addToActual(ruler2, r2)
addToActual(ruler3, r3)
require.Equal(t, tc.expectedRules, expected)
require.Equal(t, tc.expectedRules, actual)
})
}
}

@ -141,6 +141,7 @@ func (m *mockRuleStore) ListAllRuleGroups(_ context.Context) (map[string]rulespb
Name: r.Name,
User: k,
Interval: r.Interval,
Rules: r.Rules,
})
}
}
@ -163,6 +164,7 @@ func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, use
Name: r.Name,
User: userID,
Interval: r.Interval,
Rules: r.Rules,
})
}
return result, nil

@ -61,6 +61,9 @@ func (m singleValueWithLabelsMap) WriteToMetricChannel(out chan<- prometheus.Met
// Keeping map of metric name to its family makes it easier to do searches later.
type MetricFamilyMap map[string]*dto.MetricFamily
// MetricLabelTransformFunc exists in ruler package, but that would create a cyclic import, so is duplicated here
type MetricLabelTransformFunc func(k, v string) string
// NewMetricFamilyMap sorts output from Gatherer.Gather method into a map.
// Gatherer.Gather specifies that there metric families are uniquely named, and we use that fact here.
// If they are not, this method returns error.
@ -119,13 +122,13 @@ func (mfm MetricFamilyMap) SumSummariesTo(name string, output *SummaryData) {
}
}
func (mfm MetricFamilyMap) sumOfSingleValuesWithLabels(metric string, labelNames []string, extractFn func(*dto.Metric) float64, aggregateFn func(labelsKey string, labelValues []string, value float64)) {
metricsPerLabelValue := getMetricsWithLabelNames(mfm[metric], labelNames)
func (mfm MetricFamilyMap) sumOfSingleValuesWithLabels(metric string, labelNames []string, extractFn func(*dto.Metric) float64, aggregateFn func(labelsKey string, labelValues []string, value float64), transformFn MetricLabelTransformFunc) {
metricsPerLabelValue := getMetricsWithLabelNames(mfm[metric], labelNames, transformFn)
for key, mlv := range metricsPerLabelValue {
for k, mlv := range metricsPerLabelValue {
for _, m := range mlv.metrics {
val := extractFn(m)
aggregateFn(key, mlv.labelValues, val)
aggregateFn(k, mlv.labelValues, val)
}
}
}
@ -133,8 +136,9 @@ func (mfm MetricFamilyMap) sumOfSingleValuesWithLabels(metric string, labelNames
// MetricFamiliesPerUser is a collection of metrics gathered via calling Gatherer.Gather() method on different
// gatherers, one per user.
type MetricFamiliesPerUser []struct {
user string
metrics MetricFamilyMap
user string
metrics MetricFamilyMap
labelTransformFn MetricLabelTransformFunc
}
func (d MetricFamiliesPerUser) GetSumOfCounters(counter string) float64 {
@ -166,7 +170,7 @@ func (d MetricFamiliesPerUser) SendSumOfCountersPerUserWithLabels(out chan<- pro
}
result := singleValueWithLabelsMap{}
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, counterValue, result.aggregateFn)
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, counterValue, result.aggregateFn, userEntry.labelTransformFn)
result.prependUserLabelValue(userEntry.user)
result.WriteToMetricChannel(out, desc, prometheus.CounterValue)
}
@ -201,7 +205,7 @@ func (d MetricFamiliesPerUser) SendSumOfGaugesPerUserWithLabels(out chan<- prome
}
result := singleValueWithLabelsMap{}
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, gaugeValue, result.aggregateFn)
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, gaugeValue, result.aggregateFn, userEntry.labelTransformFn)
result.prependUserLabelValue(userEntry.user)
result.WriteToMetricChannel(out, desc, prometheus.GaugeValue)
}
@ -210,7 +214,7 @@ func (d MetricFamiliesPerUser) SendSumOfGaugesPerUserWithLabels(out chan<- prome
func (d MetricFamiliesPerUser) sumOfSingleValuesWithLabels(metric string, fn func(*dto.Metric) float64, labelNames []string) singleValueWithLabelsMap {
result := singleValueWithLabelsMap{}
for _, userEntry := range d {
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, fn, result.aggregateFn)
userEntry.metrics.sumOfSingleValuesWithLabels(metric, labelNames, fn, result.aggregateFn, userEntry.labelTransformFn)
}
return result
}
@ -259,7 +263,7 @@ func (d MetricFamiliesPerUser) SendSumOfSummariesWithLabels(out chan<- prometheu
result := map[string]summaryResult{}
for _, mfm := range d {
metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[summaryName], labelNames)
metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[summaryName], labelNames, mfm.labelTransformFn)
for key, mwl := range metricsPerLabelValue {
for _, m := range mwl.metrics {
@ -307,7 +311,7 @@ func (d MetricFamiliesPerUser) SendSumOfHistogramsWithLabels(out chan<- promethe
result := map[string]histogramResult{}
for _, mfm := range d {
metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[histogramName], labelNames)
metricsPerLabelValue := getMetricsWithLabelNames(mfm.metrics[histogramName], labelNames, mfm.labelTransformFn)
for key, mwl := range metricsPerLabelValue {
for _, m := range mwl.metrics {
@ -333,11 +337,11 @@ type metricsWithLabels struct {
metrics []*dto.Metric
}
func getMetricsWithLabelNames(mf *dto.MetricFamily, labelNames []string) map[string]metricsWithLabels {
func getMetricsWithLabelNames(mf *dto.MetricFamily, labelNames []string, labelTransformFunc MetricLabelTransformFunc) map[string]metricsWithLabels {
result := map[string]metricsWithLabels{}
for _, m := range mf.GetMetric() {
lbls, include := getLabelValues(m, labelNames)
lbls, include := getLabelValues(m, labelNames, labelTransformFunc)
if !include {
continue
}
@ -353,7 +357,7 @@ func getMetricsWithLabelNames(mf *dto.MetricFamily, labelNames []string) map[str
return result
}
func getLabelValues(m *dto.Metric, labelNames []string) ([]string, bool) {
func getLabelValues(m *dto.Metric, labelNames []string, labelTransformFunc MetricLabelTransformFunc) ([]string, bool) {
result := make([]string, 0, len(labelNames))
for _, ln := range labelNames {
@ -367,7 +371,12 @@ func getLabelValues(m *dto.Metric, labelNames []string) ([]string, bool) {
continue
}
result = append(result, lp.GetValue())
value := lp.GetValue()
if labelTransformFunc != nil {
value = labelTransformFunc(ln, value)
}
result = append(result, value)
found = true
break
}
@ -669,16 +678,21 @@ func (r *UserRegistries) Registries() []UserRegistry {
return out
}
func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
func (r *UserRegistries) BuildMetricFamiliesPerUser(labelTransformFn MetricLabelTransformFunc) MetricFamiliesPerUser {
data := MetricFamiliesPerUser{}
for _, entry := range r.Registries() {
// Set for removed users.
if entry.reg == nil {
if entry.lastGather != nil {
data = append(data, struct {
user string
metrics MetricFamilyMap
}{user: "", metrics: entry.lastGather})
user string
metrics MetricFamilyMap
labelTransformFn MetricLabelTransformFunc
}{
user: "",
metrics: entry.lastGather,
labelTransformFn: labelTransformFn,
})
}
continue
@ -690,11 +704,13 @@ func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
mfm, err = NewMetricFamilyMap(m)
if err == nil {
data = append(data, struct {
user string
metrics MetricFamilyMap
user string
metrics MetricFamilyMap
labelTransformFn MetricLabelTransformFunc
}{
user: entry.user,
metrics: mfm,
user: entry.user,
metrics: mfm,
labelTransformFn: labelTransformFn,
})
}
}

@ -60,8 +60,8 @@ func TestCounterValue(t *testing.T) {
func TestGetMetricsWithLabelNames(t *testing.T) {
labels := []string{"a", "b"}
require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(nil, labels))
require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(&dto.MetricFamily{}, labels))
require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(nil, labels, nil))
require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(&dto.MetricFamily{}, labels, nil))
m1 := &dto.Metric{Label: makeLabels("a", "5"), Counter: &dto.Counter{Value: proto.Float64(1)}}
m2 := &dto.Metric{Label: makeLabels("a", "10", "b", "20"), Counter: &dto.Counter{Value: proto.Float64(1.5)}}
@ -70,7 +70,7 @@ func TestGetMetricsWithLabelNames(t *testing.T) {
m5 := &dto.Metric{Label: makeLabels("a", "11", "b", "21"), Counter: &dto.Counter{Value: proto.Float64(4)}}
m6 := &dto.Metric{Label: makeLabels("ignored", "123", "a", "12", "b", "22", "c", "30"), Counter: &dto.Counter{Value: proto.Float64(4)}}
out := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, labels)
out := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, labels, nil)
// m1 is not returned at all, as it doesn't have both required labels.
require.Equal(t, map[string]metricsWithLabels{
@ -87,7 +87,7 @@ func TestGetMetricsWithLabelNames(t *testing.T) {
// no labels -- returns all metrics in single key. this isn't very efficient, and there are other functions
// (without labels) to handle this better, but it still works.
out2 := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, nil)
out2 := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, []string{}, nil)
require.Equal(t, map[string]metricsWithLabels{
getLabelsString(nil): {
labelValues: []string{},
@ -126,7 +126,7 @@ func BenchmarkGetMetricsWithLabelNames(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
out := getMetricsWithLabelNames(mf, []string{"label_1", "label_2", "label_3"})
out := getMetricsWithLabelNames(mf, []string{"label_1", "label_2", "label_3"}, nil)
if expected := 1; len(out) != expected {
b.Fatalf("unexpected number of output groups: expected = %d got = %d", expected, len(out))
@ -165,7 +165,7 @@ func TestSendSumOfGaugesPerUserWithLabels(t *testing.T) {
regs := NewUserRegistries()
regs.AddUserRegistry("user-1", user1Reg)
regs.AddUserRegistry("user-2", user2Reg)
mf := regs.BuildMetricFamiliesPerUser()
mf := regs.BuildMetricFamiliesPerUser(nil)
{
desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil)
@ -217,7 +217,7 @@ func TestSendMaxOfGauges(t *testing.T) {
regs.AddUserRegistry("user-2", user2Reg)
// No matching metric.
mf := regs.BuildMetricFamiliesPerUser()
mf := regs.BuildMetricFamiliesPerUser(nil)
actual := collectMetrics(t, func(out chan prometheus.Metric) {
mf.SendMaxOfGauges(out, desc, "test_metric")
})
@ -231,7 +231,7 @@ func TestSendMaxOfGauges(t *testing.T) {
user2Metric := promauto.With(user2Reg).NewGauge(prometheus.GaugeOpts{Name: "test_metric"})
user1Metric.Set(100)
user2Metric.Set(80)
mf = regs.BuildMetricFamiliesPerUser()
mf = regs.BuildMetricFamiliesPerUser(nil)
actual = collectMetrics(t, func(out chan prometheus.Metric) {
mf.SendMaxOfGauges(out, desc, "test_metric")
@ -259,7 +259,7 @@ func TestSendSumOfHistogramsWithLabels(t *testing.T) {
regs := NewUserRegistries()
regs.AddUserRegistry("user-1", user1Reg)
regs.AddUserRegistry("user-2", user2Reg)
mf := regs.BuildMetricFamiliesPerUser()
mf := regs.BuildMetricFamiliesPerUser(nil)
{
desc := prometheus.NewDesc("test_metric", "", []string{"label_one"}, nil)
@ -335,7 +335,7 @@ func TestSumOfCounterPerUserWithLabels(t *testing.T) {
regs := NewUserRegistries()
regs.AddUserRegistry("user-1", user1Reg)
regs.AddUserRegistry("user-2", user2Reg)
mf := regs.BuildMetricFamiliesPerUser()
mf := regs.BuildMetricFamiliesPerUser(nil)
{
desc := prometheus.NewDesc("test_metric", "", []string{"user", "label_one"}, nil)
@ -397,7 +397,7 @@ func TestSendSumOfSummariesPerUser(t *testing.T) {
regs := NewUserRegistries()
regs.AddUserRegistry("user-1", user1Reg)
regs.AddUserRegistry("user-2", user2Reg)
mf := regs.BuildMetricFamiliesPerUser()
mf := regs.BuildMetricFamiliesPerUser(nil)
{
desc := prometheus.NewDesc("test_metric", "", []string{"user"}, nil)
@ -497,7 +497,7 @@ func TestFloat64PrecisionStability(t *testing.T) {
expected := map[string][]*dto.Metric{}
for run := 0; run < numRuns; run++ {
mf := registries.BuildMetricFamiliesPerUser()
mf := registries.BuildMetricFamiliesPerUser(nil)
gauge := collectMetrics(t, func(out chan prometheus.Metric) {
mf.SendSumOfGauges(out, prometheus.NewDesc("test_gauge", "", nil, nil), "test_gauge")
@ -996,7 +996,7 @@ func (tm *testMetrics) Describe(out chan<- *prometheus.Desc) {
}
func (tm *testMetrics) Collect(out chan<- prometheus.Metric) {
data := tm.regs.BuildMetricFamiliesPerUser()
data := tm.regs.BuildMetricFamiliesPerUser(nil)
data.SendSumOfGauges(out, tm.gauge, "test_gauge")
data.SendSumOfGaugesPerUser(out, tm.gaugePerUser, "test_gauge")

@ -6,10 +6,20 @@ import (
"math"
)
// Sharding strategies & algorithms.
const (
// Sharding strategies.
// ShardingStrategyDefault shards rule groups across available rulers in the ring.
ShardingStrategyDefault = "default"
// ShardingStrategyShuffle shards tenants' rule groups across available rulers in the ring using a
// shuffle-sharding algorithm.
ShardingStrategyShuffle = "shuffle-sharding"
// ShardingAlgoByGroup is an alias of ShardingStrategyDefault.
ShardingAlgoByGroup = "by-group"
// ShardingAlgoByRule shards all rules evenly across available rules in the ring, regardless of group.
// This can be achieved because currently Loki recording/alerting rules cannot not any inter-dependency, unlike
// Prometheus rules, so there's really no need to shard by group. This will eventually become the new default strategy.
ShardingAlgoByRule = "by-rule" // this will eventually become the new default strategy.
)
var (

@ -234,7 +234,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.")
f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.")
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when shuffle-sharding is enabled in the ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "Feature renamed to 'runtime configuration', flag deprecated in favor of -runtime-config.file (runtime_config.file in YAML).")
_ = l.RetentionPeriod.Set("744h")
@ -513,9 +513,8 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration {
}
// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
// Not used in Loki.
func (o *Overrides) RulerTenantShardSize(userID string) int {
return 0
return o.getOverridesForUser(userID).RulerTenantShardSize
}
// RulerMaxRulesPerRuleGroup returns the maximum number of rules per rule group for a given user.

@ -164,7 +164,7 @@ func main() {
ConfigFile string
GeneratedFileWarning string
}{
GeneratedFileWarning: "<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template -->",
GeneratedFileWarning: "<!-- DO NOT EDIT THIS FILE - This file has been automatically generated from its .template, regenerate with `make doc` from root directory. -->",
ConfigFile: generateBlocksMarkdown(blocks),
}

Loading…
Cancel
Save