Ruler: remote rule evaluation (#8744)

**What this PR does / why we need it**:
Adds the ability to evaluate recording & alerting rules against a given
`query-frontend`, allowing these queries to be executed with all the
parallelisation & optimisation that regular adhoc queries have. This is
important because with `local` evaluation all queries are
single-threaded, and rules that evaluate a large range/volume of data
may timeout or OOM the `ruler` itself, leading to missed metrics or
alerts.

When `remote` evaluation mode is enabled, the `ruler` effectively just
becomes a gRPC client for the `query-frontend`, which will dramatically
improve the reliability of the `ruler` and also drastically reduce its
resource requirements.

**Which issue(s) this PR fixes**:
This PR implements the feature discussed in
https://github.com/grafana/loki/pull/8129 (**LID 0002: Remote Rule
Evaluation**).
pull/8792/head
Danny Kopping 3 years ago committed by GitHub
parent 540380fa94
commit 33e44ed39d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 81
      docs/sources/configuration/_index.md
  3. 2
      integration/client/client.go
  4. 124
      integration/cluster/cluster.go
  5. 53
      integration/cluster/ruler.go
  6. 175
      integration/loki_rule_eval_test.go
  7. 54
      integration/util/merger.go
  8. 5
      pkg/loki/loki.go
  9. 77
      pkg/loki/modules.go
  10. 25
      pkg/querier/queryrange/codec.go
  11. 48
      pkg/querier/queryrange/prometheus.go
  12. 6
      pkg/querier/series/series_set.go
  13. 27
      pkg/ruler/compat.go
  14. 11
      pkg/ruler/compat_test.go
  15. 3
      pkg/ruler/config.go
  16. 36
      pkg/ruler/evaluator.go
  17. 53
      pkg/ruler/evaluator_local.go
  18. 259
      pkg/ruler/evaluator_remote.go
  19. 5
      pkg/ruler/ruler.go
  20. 23
      production/docker/config/loki.yaml
  21. 25
      production/docker/docker-compose.yaml
  22. 20
      production/docker/rules/docker/rules.yml

@ -6,6 +6,7 @@
##### Enhancements
* [8744](https://github.com/grafana/loki/pull/8744) **dannykopping**: Ruler: remote rule evaluation.
* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.
* [8682](https://github.com/grafana/loki/pull/8682) **dannykopping**: Add fetched chunk size distribution metric `loki_chunk_fetcher_fetched_size_bytes`.
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects

@ -1090,6 +1090,87 @@ remote_write:
# -limits.per-user-override-period.
# CLI flag: -ruler.remote-write.config-refresh-period
[config_refresh_period: <duration> | default = 10s]
# Configuration for rule evaluation.
evaluation:
# The evaluation mode for the ruler. Can be either 'local' or 'remote'. If set
# to 'local', the ruler will evaluate rules locally. If set to 'remote', the
# ruler will evaluate rules remotely. If unset, the ruler will evaluate rules
# locally.
# CLI flag: -ruler.evaluation.mode
[mode: <string> | default = "local"]
query_frontend:
# GRPC listen address of the query-frontend(s). Must be a DNS address
# (prefixed with dns:///) to enable client side load balancing.
# CLI flag: -ruler.evaluation.query-frontend.address
[address: <string> | default = ""]
# Set to true if query-frontend connection requires TLS.
# CLI flag: -ruler.evaluation.query-frontend.tls-enabled
[tls_enabled: <boolean> | default = false]
# Path to the client certificate file, which will be used for authenticating
# with the server. Also requires the key path to be configured.
# CLI flag: -ruler.evaluation.query-frontend.tls-cert-path
[tls_cert_path: <string> | default = ""]
# Path to the key file for the client certificate. Also requires the client
# certificate to be configured.
# CLI flag: -ruler.evaluation.query-frontend.tls-key-path
[tls_key_path: <string> | default = ""]
# Path to the CA certificates file to validate server certificate against.
# If not set, the host's root CA certificates are used.
# CLI flag: -ruler.evaluation.query-frontend.tls-ca-path
[tls_ca_path: <string> | default = ""]
# Override the expected name on the server certificate.
# CLI flag: -ruler.evaluation.query-frontend.tls-server-name
[tls_server_name: <string> | default = ""]
# Skip validating server certificate.
# CLI flag: -ruler.evaluation.query-frontend.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# Override the default cipher suite list (separated by commas). Allowed
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
# - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA
# - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
# - TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
# - TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
# - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
# - TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
#
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256
# - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256
# CLI flag: -ruler.evaluation.query-frontend.tls-cipher-suites
[tls_cipher_suites: <string> | default = ""]
# Override the default minimum TLS version. Allowed values: VersionTLS10,
# VersionTLS11, VersionTLS12, VersionTLS13
# CLI flag: -ruler.evaluation.query-frontend.tls-min-version
[tls_min_version: <string> | default = ""]
```
### ingester_client

@ -434,7 +434,7 @@ func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
resp := RulesResponse{}
err = json.Unmarshal(buf, &resp)
if err != nil {
return nil, fmt.Errorf("error parsing response data: %w", err)
return nil, fmt.Errorf("error parsing response data %q: %w", buf, err)
}
return &resp, err

@ -1,6 +1,7 @@
package cluster
import (
"bytes"
"context"
"errors"
"flag"
@ -19,6 +20,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/integration/util"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/util/cfg"
"github.com/grafana/loki/pkg/validation"
@ -86,43 +89,20 @@ ingester:
querier:
multi_tenant_queries_enabled: true
{{if .remoteWriteUrls}}
ruler:
wal:
dir: {{.rulerWALPath}}
storage:
type: local
local:
directory: {{.rulesPath}}
rule_path: {{.sharedDataPath}}/rule
enable_api: true
ring:
kvstore:
store: inmemory
remote_write:
enabled: true
clients:
remote_client1:
url: {{index .remoteWriteUrls 0}}/api/v1/write
remote_client2:
url: {{index .remoteWriteUrls 1}}/api/v1/write
{{end}}
`))
wal:
dir: {{.sharedDataPath}}/ruler-wal
storage:
type: local
local:
directory: {{.sharedDataPath}}/rules
rule_path: {{.sharedDataPath}}/prom-rule
rulesConfig = `
groups:
- name: always-firing
interval: 1s
rules:
- alert: fire
expr: |
1 > 0
for: 0m
labels:
severity: warning
annotations:
summary: test
`
`))
)
func wrapRegistry() {
@ -265,16 +245,18 @@ type Component struct {
flags []string
configFile string
extraConfigs []string
overridesFile string
dataPath string
rulerWALPath string
rulesPath string
RulesTenant string
running bool
wg sync.WaitGroup
}
RemoteWriteUrls []string
// ClusterSharedPath returns the path to the shared directory between all components in the cluster.
// This path will be removed once the cluster is stopped.
func (c *Component) ClusterSharedPath() string {
return c.cluster.sharedPath
}
func (c *Component) HTTPURL() string {
@ -285,6 +267,14 @@ func (c *Component) GRPCURL() string {
return fmt.Sprintf("localhost:%s", port(c.loki.Server.GRPCListenAddr().String()))
}
func (c *Component) WithExtraConfig(cfg string) {
if c.running {
panic("cannot set extra config after component is running")
}
c.extraConfigs = append(c.extraConfigs, cfg)
}
func port(addr string) string {
parts := strings.Split(addr, ":")
return parts[len(parts)-1]
@ -303,52 +293,46 @@ func (c *Component) writeConfig() error {
return fmt.Errorf("error creating data path: %w", err)
}
if len(c.RemoteWriteUrls) > 0 {
c.rulesPath, err = os.MkdirTemp(c.cluster.sharedPath, "rules")
mergedConfig, err := c.MergedConfig()
if err != nil {
return fmt.Errorf("error creating rules path: %w", err)
return fmt.Errorf("error getting merged config: %w", err)
}
fakeDir, err := os.MkdirTemp(c.rulesPath, "fake")
if err != nil {
return fmt.Errorf("error creating rules/fake path: %w", err)
}
s := strings.Split(fakeDir, "/")
c.RulesTenant = s[len(s)-1]
c.rulerWALPath, err = os.MkdirTemp(c.cluster.sharedPath, "ruler-wal")
if err != nil {
return fmt.Errorf("error creating ruler-wal path: %w", err)
}
rulesConfigFile, err := os.CreateTemp(fakeDir, "rules*.yaml")
if err != nil {
return fmt.Errorf("error creating rules config file: %w", err)
if err := os.WriteFile(configFile.Name(), mergedConfig, 0644); err != nil {
return fmt.Errorf("error writing config file: %w", err)
}
if _, err = rulesConfigFile.Write([]byte(rulesConfig)); err != nil {
return fmt.Errorf("error writing to rules config file: %w", err)
if err := configFile.Close(); err != nil {
return fmt.Errorf("error closing config file: %w", err)
}
c.configFile = configFile.Name()
return nil
}
rulesConfigFile.Close()
}
// MergedConfig merges the base config template with any additional config that has been provided
func (c *Component) MergedConfig() ([]byte, error) {
var sb bytes.Buffer
if err := configTemplate.Execute(configFile, map[string]interface{}{
if err := configTemplate.Execute(&sb, map[string]interface{}{
"dataPath": c.dataPath,
"sharedDataPath": c.cluster.sharedPath,
"remoteWriteUrls": c.RemoteWriteUrls,
"rulesPath": c.rulesPath,
"rulerWALPath": c.rulerWALPath,
}); err != nil {
return fmt.Errorf("error writing config file: %w", err)
return nil, fmt.Errorf("error writing config file: %w", err)
}
if err := configFile.Close(); err != nil {
return fmt.Errorf("error closing config file: %w", err)
merger := util.NewYAMLMerger()
merger.AddFragment(sb.Bytes())
for _, extra := range c.extraConfigs {
merger.AddFragment([]byte(extra))
}
c.configFile = configFile.Name()
return nil
merged, err := merger.Merge()
if err != nil {
return nil, fmt.Errorf("failed to marshal merged config to YAML: %w", err)
}
return merged, nil
}
func (c *Component) run() error {
@ -440,12 +424,6 @@ func (c *Component) cleanup() (files []string, dirs []string) {
if c.dataPath != "" {
dirs = append(dirs, c.dataPath)
}
if c.rulerWALPath != "" {
dirs = append(dirs, c.rulerWALPath)
}
if c.rulesPath != "" {
dirs = append(dirs, c.rulesPath)
}
return files, dirs
}

@ -0,0 +1,53 @@
package cluster
import (
"fmt"
"os"
"path/filepath"
"strings"
)
func (c *Component) WithRulerRemoteWrite(name, url string) {
// ensure remote-write is enabled
c.WithExtraConfig(`
ruler:
remote_write:
enabled: true
`)
c.WithExtraConfig(fmt.Sprintf(`
ruler:
remote_write:
clients:
%s:
url: %s/api/v1/write
queue_config:
# send immediately as soon as a sample is generated
capacity: 1
batch_send_deadline: 0s
`, name, url))
}
func (c *Component) WithTenantRules(tenantFilesMap map[string]map[string]string) error {
sharedPath := c.ClusterSharedPath()
rulesPath := filepath.Join(sharedPath, "rules")
if err := os.Mkdir(rulesPath, 0755); err != nil {
return fmt.Errorf("error creating rules path: %w", err)
}
for tenant, files := range tenantFilesMap {
for filename, file := range files {
path := filepath.Join(rulesPath, tenant)
if err := os.Mkdir(path, 0755); err != nil {
return fmt.Errorf("error creating tenant %s rules path: %w", tenant, err)
}
if err := os.WriteFile(filepath.Join(path, filename), []byte(strings.TrimSpace(file)), 0644); err != nil {
return fmt.Errorf("error creating rule file at path %s: %w", path, err)
}
}
}
return nil
}

@ -0,0 +1,175 @@
package integration
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/ruler"
)
// TestLocalRuleEval tests that rules are evaluated locally with an embedded query engine
// and that the results are written to the backend correctly.
func TestLocalRuleEval(t *testing.T) {
testRuleEval(t, ruler.EvalModeLocal)
}
// TestRemoteRuleEval tests that rules are evaluated remotely against a configured query-frontend
// and that the results are written to the backend correctly.
func TestRemoteRuleEval(t *testing.T) {
testRuleEval(t, ruler.EvalModeRemote)
}
// The only way we can test rule evaluation in an integration test is to use the remote-write feature.
// In this test we stub out a remote-write receiver and check that the expected data is sent to it.
// Both the local and the remote rule evaluation modes should produce the same result.
func testRuleEval(t *testing.T, mode string) {
clu := cluster.New()
t.Cleanup(func() {
assert.NoError(t, clu.Cleanup())
})
// initialise a write component and ingest some logs
tWrite := clu.AddComponent(
"write",
"-target=write",
)
now := time.Now()
tenantID := randStringRunes()
require.NoError(t, clu.Run())
job := "accesslog"
cliWrite := client.New(tenantID, "", tWrite.HTTPURL())
cliWrite.Now = now
t.Run("ingest logs", func(t *testing.T) {
require.NoError(t, cliWrite.PushLogLineWithTimestamp("HEAD /", now, map[string]string{"method": "HEAD", "job": job}))
require.NoError(t, cliWrite.PushLogLineWithTimestamp("GET /", now, map[string]string{"method": "GET", "job": job}))
require.NoError(t, cliWrite.PushLogLineWithTimestamp("GET /", now.Add(time.Second), map[string]string{"method": "GET", "job": job}))
})
// advance time to after the last ingested log line so queries don't return empty results
now = now.Add(time.Second * 2)
// start up read component for remote rule evaluation
tRead := clu.AddComponent(
"read",
"-target=read",
// we set a fake address here because deletion is not being tested,
// and we have a circular dependency with the backend
"-common.compactor-address=http://fake",
"-legacy-read-mode=false",
)
require.NoError(t, clu.Run())
// start up a backend component which contains the ruler
tBackend := clu.AddComponent(
"backend",
"-target=backend",
"-legacy-read-mode=false",
)
rwHandler := func(called *bool, test func(w http.ResponseWriter, r *http.Request)) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/write" {
t.Errorf("Expected to request '/api/v1/write', got: %s", r.URL.Path)
}
test(w, r)
*called = true
w.WriteHeader(http.StatusOK)
}))
}
// this is the function that will be called when the remote-write receiver receives a request.
// it tests that the expected payload is received.
expectedResults := func(w http.ResponseWriter, r *http.Request) {
wr, err := remote.DecodeWriteRequest(r.Body)
require.NoError(t, err)
// depending on the rule interval, we may get multiple timeseries before remote-write is triggered,
// so we just check that we have at least one that matches our requirements.
require.GreaterOrEqual(t, len(wr.Timeseries), 1)
// we expect to see two GET lines from the aggregation in the recording rule
require.Equal(t, wr.Timeseries[len(wr.Timeseries)-1].Samples[0].Value, float64(2))
}
var called bool
server1 := rwHandler(&called, expectedResults)
defer server1.Close()
// configure the backend component
tBackend.WithRulerRemoteWrite("target1", server1.URL)
if mode == ruler.EvalModeRemote {
tBackend.WithExtraConfig(fmt.Sprintf(`
ruler:
evaluation:
mode: %s
query_frontend:
address: %s
`, mode, tRead.GRPCURL()))
}
record := fmt.Sprintf(`
groups:
- name: record
interval: 1s
rules:
- record: test
expr: sum by (method) (count_over_time({job="%s", method="GET"}[1m]))
labels:
foo: bar
`, job)
require.NoError(t, tBackend.WithTenantRules(map[string]map[string]string{
tenantID: {
"record.yaml": record,
},
}))
m, e := tBackend.MergedConfig()
require.NoError(t, e)
t.Logf("starting backend with config:\n%s\n", m)
require.NoError(t, clu.Run())
cliBackend := client.New(tenantID, "", tBackend.HTTPURL())
cliBackend.Now = now
t.Run(fmt.Sprintf("%s rule evaluation", mode), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// check rules exist
resp, err := cliBackend.GetRules(ctx)
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, "success", resp.Status)
require.Len(t, resp.Data.Groups, 1)
require.Len(t, resp.Data.Groups[0].Rules, 1)
// ensure that both remote-write receivers were called
require.Eventually(t, func() bool {
return assert.ObjectsAreEqualValues(true, called)
}, 20*time.Second, 100*time.Millisecond, "remote-write was not called")
})
}

@ -0,0 +1,54 @@
package util
import (
"fmt"
"github.com/imdario/mergo"
"gopkg.in/yaml.v2"
)
// YAMLMerger takes a set of given YAML fragments and merges them into a single YAML document.
// The order in which these fragments is supplied is maintained, so subsequent fragments will override preceding ones.
type YAMLMerger struct {
fragments [][]byte
}
func NewYAMLMerger() *YAMLMerger {
return &YAMLMerger{}
}
func (m *YAMLMerger) AddFragment(fragment []byte) {
m.fragments = append(m.fragments, fragment)
}
func (m *YAMLMerger) Merge() ([]byte, error) {
merged := make(map[interface{}]interface{})
for _, fragment := range m.fragments {
fragmentMap, err := yamlToMap(fragment)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal given fragment %q to map: %w", fragment, err)
}
if err = mergo.Merge(&merged, fragmentMap, mergo.WithOverride, mergo.WithTypeCheck); err != nil {
return nil, fmt.Errorf("failed to merge fragment %q with base: %w", fragment, err)
}
}
mergedYAML, err := yaml.Marshal(merged)
if err != nil {
return nil, err
}
return mergedYAML, nil
}
func yamlToMap(fragment []byte) (interface{}, error) {
var fragmentMap map[interface{}]interface{}
err := yaml.Unmarshal(fragment, &fragmentMap)
if err != nil {
return nil, err
}
return fragmentMap, nil
}

@ -356,6 +356,7 @@ type Loki struct {
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
ruleEvaluator ruler.Evaluator
RulerStorage rulestore.RuleStore
rulerAPI *base_ruler.API
stopper queryrange.Stopper
@ -625,6 +626,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
mm.RegisterModule(Ruler, t.initRuler)
mm.RegisterModule(RuleEvaluator, t.initRuleEvaluator, modules.UserInvisibleModule)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
@ -652,7 +654,8 @@ func (t *Loki) setupModuleManager() error {
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, UsageReport, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, UsageReport},
RuleEvaluator: {Ring, Server, Store, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},
Compactor: {Server, Overrides, MemberlistKV, UsageReport},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV, IndexGatewayRing},

@ -94,6 +94,7 @@ const (
QueryLimitsTripperware string = "query-limits-tripper"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
RuleEvaluator string = "rule-evaluator"
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
@ -910,27 +911,20 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
func (t *Loki) initRuler() (_ services.Service, err error) {
if t.RulerStorage == nil {
level.Info(util_log.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
level.Warn(util_log.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
return nil, nil
}
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
deleteStore, err := t.deleteRequestsClient("ruler", t.Overrides)
if err != nil {
return nil, err
}
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil)
if err != nil {
return nil, err
if t.ruleEvaluator == nil {
level.Warn(util_log.Logger).Log("msg", "RuleEvaluator is nil. Not starting the ruler.") // TODO better error msg
return nil, nil
}
engine := logql.NewEngine(t.Cfg.Querier.Engine, q, t.Overrides, log.With(util_log.Logger, "component", "ruler"))
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.ruler, err = ruler.NewRuler(
t.Cfg.Ruler,
engine,
t.ruleEvaluator,
prometheus.DefaultRegisterer,
util_log.Logger,
t.RulerStorage,
@ -974,10 +968,53 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.rulerAPI.DeleteRuleGroup)))
}
deleteStore, err := t.deleteRequestsClient("ruler", t.Overrides)
if err != nil {
return nil, err
}
t.ruler.AddListener(deleteRequestsStoreListener(deleteStore))
return t.ruler, nil
}
func (t *Loki) initRuleEvaluator() (services.Service, error) {
if err := t.Cfg.Ruler.Evaluation.Validate(); err != nil {
return nil, fmt.Errorf("invalid ruler evaluation config: %w", err)
}
var (
evaluator ruler.Evaluator
err error
)
mode := t.Cfg.Ruler.Evaluation.Mode
logger := log.With(util_log.Logger, "component", "ruler", "evaluation_mode", mode)
switch mode {
case ruler.EvalModeLocal:
var engine *logql.Engine
engine, err = t.createRulerQueryEngine(logger)
if err != nil {
break
}
evaluator, err = ruler.NewLocalEvaluator(&t.Cfg.Ruler.Evaluation, engine, logger)
case ruler.EvalModeRemote:
evaluator, err = ruler.NewRemoteEvaluator(&t.Cfg.Ruler.Evaluation, logger)
default:
err = fmt.Errorf("unknown rule evaluation mode %q", mode)
}
if err != nil {
return nil, fmt.Errorf("failed to create %s rule evaluator: %w", mode, err)
}
t.ruleEvaluator = evaluator
return nil, nil
}
func (t *Loki) initMemberlistKV() (services.Service, error) {
reg := prometheus.DefaultRegisterer
@ -1221,6 +1258,20 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}
func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err error) {
deleteStore, err := t.deleteRequestsClient("rule-evaluator", t.Overrides)
if err != nil {
return nil, fmt.Errorf("could not create delete requests store: %w", err)
}
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
}
return logql.NewEngine(t.Cfg.Querier.Engine, q, t.Overrides, logger), nil
}
func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
if pc.ObjectType != indexshipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")

@ -521,6 +521,18 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
},
Statistics: resp.Data.Statistics,
}, nil
case loghttp.ResultTypeScalar:
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: resp.Status,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeScalar,
Result: toProtoScalar(resp.Data.Result.(loghttp.Scalar)),
},
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(r.Header)),
},
Statistics: resp.Data.Statistics,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type, got (%s)", string(resp.Data.ResultType))
}
@ -809,6 +821,19 @@ func toProtoVector(v loghttp.Vector) []queryrangebase.SampleStream {
return res
}
func toProtoScalar(v loghttp.Scalar) []queryrangebase.SampleStream {
res := make([]queryrangebase.SampleStream, 0, 1)
res = append(res, queryrangebase.SampleStream{
Samples: []logproto.LegacySample{{
Value: float64(v.Value),
TimestampMs: v.Timestamp.UnixNano() / 1e6,
}},
Labels: nil,
})
return res
}
func (res LokiResponse) Count() int64 {
var result int64
for _, s := range res.Data.Result {

@ -47,10 +47,14 @@ func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
b []byte
err error
)
if p.Response.Data.ResultType == loghttp.ResultTypeVector {
switch p.Response.Data.ResultType {
case loghttp.ResultTypeVector:
b, err = p.marshalVector()
} else {
case loghttp.ResultTypeMatrix:
b, err = p.marshalMatrix()
case loghttp.ResultTypeScalar:
b, err = p.marshalScalar()
}
if err != nil {
return nil, err
@ -131,3 +135,43 @@ func (p *LokiPromResponse) marshalMatrix() ([]byte, error) {
Status: p.Response.Status,
})
}
func (p *LokiPromResponse) marshalScalar() ([]byte, error) {
var scalar loghttp.Scalar
for _, r := range p.Response.Data.Result {
if len(r.Samples) <= 0 {
continue
}
scalar = loghttp.Scalar{
Value: model.SampleValue(r.Samples[0].Value),
Timestamp: model.TimeFromUnix(r.Samples[0].TimestampMs),
}
break
}
return jsonStd.Marshal(struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result loghttp.Scalar `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
} `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}{
Error: p.Response.Error,
Data: struct {
ResultType string `json:"resultType"`
Result loghttp.Scalar `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
}{
ResultType: loghttp.ResultTypeScalar,
Result: scalar,
Statistics: p.Statistics,
},
ErrorType: p.Response.ErrorType,
Status: p.Response.Status,
})
}

@ -191,7 +191,7 @@ func MatrixToSeriesSet(m model.Matrix) storage.SeriesSet {
series := make([]storage.Series, 0, len(m))
for _, ss := range m {
series = append(series, &ConcreteSeries{
labels: metricToLabels(ss.Metric),
labels: MetricToLabels(ss.Metric),
samples: ss.Values,
})
}
@ -203,14 +203,14 @@ func MetricsToSeriesSet(ms []metric.Metric) storage.SeriesSet {
series := make([]storage.Series, 0, len(ms))
for _, m := range ms {
series = append(series, &ConcreteSeries{
labels: metricToLabels(m.Metric),
labels: MetricToLabels(m.Metric),
samples: nil,
})
}
return NewConcreteSeriesSet(series)
}
func metricToLabels(m model.Metric) labels.Labels {
func MetricToLabels(m model.Metric) labels.Labels {
ls := make(labels.Labels, 0, len(m))
for k, v := range m {
ls = append(ls, labels.Label{

@ -22,8 +22,6 @@ import (
"github.com/prometheus/prometheus/template"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulespb"
@ -54,8 +52,8 @@ type RulesLimits interface {
// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func engineQueryFunc(engine *logql.Engine, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc {
return rules.QueryFunc(func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
func engineQueryFunc(evaluator Evaluator, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
// check if storage instance is ready; if not, fail the rule evaluation;
// we do this to prevent an attempt to append new samples before the WAL appender is ready
if !checker.isReady(userID) {
@ -63,21 +61,10 @@ func engineQueryFunc(engine *logql.Engine, overrides RulesLimits, checker readyC
}
adjusted := t.Add(-overrides.EvaluationDelay(userID))
params := logql.NewLiteralParams(
qs,
adjusted,
adjusted,
0,
0,
logproto.FORWARD,
0,
nil,
)
q := engine.Query(params)
res, err := evaluator.Eval(ctx, qs, adjusted)
res, err := q.Exec(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("rule evaluation failed: %w", err)
}
switch v := res.Data.(type) {
case promql.Vector:
@ -90,7 +77,7 @@ func engineQueryFunc(engine *logql.Engine, overrides RulesLimits, checker readyC
default:
return nil, errors.New("rule result is not a vector or scalar")
}
})
}
}
// MultiTenantManagerAdapter will wrap a MultiTenantManager which validates loki rules
@ -129,7 +116,7 @@ const MetricsPrefix = "loki_ruler_wal_"
var registry storageRegistry
func MultiTenantRuleManager(cfg Config, engine *logql.Engine, overrides RulesLimits, logger log.Logger, reg prometheus.Registerer) ruler.ManagerFactory {
func MultiTenantRuleManager(cfg Config, evaluator Evaluator, overrides RulesLimits, logger log.Logger, reg prometheus.Registerer) ruler.ManagerFactory {
reg = prometheus.WrapRegistererWithPrefix(MetricsPrefix, reg)
registry = newWALRegistry(log.With(logger, "storage", "registry"), reg, cfg, overrides)
@ -144,7 +131,7 @@ func MultiTenantRuleManager(cfg Config, engine *logql.Engine, overrides RulesLim
registry.configureTenantStorage(userID)
logger = log.With(logger, "user", userID)
queryFunc := engineQueryFunc(engine, overrides, registry, userID)
queryFunc := engineQueryFunc(evaluator, overrides, registry, userID)
memStore := NewMemStore(userID, queryFunc, newMemstoreMetrics(reg), 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))
// GroupLoader builds a cache of the rules as they're loaded by the

@ -10,7 +10,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql"
ruler "github.com/grafana/loki/pkg/ruler/base"
rulerbase "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
@ -19,7 +19,7 @@ import (
func TestInvalidRemoteWriteConfig(t *testing.T) {
// if remote-write is not enabled, validation fails
cfg := Config{
Config: ruler.Config{},
Config: rulerbase.Config{},
RemoteWrite: RemoteWriteConfig{
Enabled: false,
},
@ -28,7 +28,7 @@ func TestInvalidRemoteWriteConfig(t *testing.T) {
// if no remote-write URL is configured, validation fails
cfg = Config{
Config: ruler.Config{},
Config: rulerbase.Config{},
RemoteWrite: RemoteWriteConfig{
Enabled: true,
Client: &config.RemoteWriteConfig{
@ -46,7 +46,10 @@ func TestNonMetricQuery(t *testing.T) {
require.Nil(t, err)
engine := logql.NewEngine(logql.EngineOpts{}, &FakeQuerier{}, overrides, log.Logger)
queryFunc := engineQueryFunc(engine, overrides, fakeChecker{}, "fake")
eval, err := NewLocalEvaluator(&EvaluationConfig{Mode: EvalModeLocal}, engine, log.Logger)
require.NoError(t, err)
queryFunc := engineQueryFunc(eval, overrides, fakeChecker{}, "fake")
_, err = queryFunc(context.TODO(), `{job="nginx"}`, time.Now())
require.Error(t, err, "rule result is not a vector or scalar")

@ -22,6 +22,8 @@ type Config struct {
WALCleaner cleaner.Config `yaml:"wal_cleaner,omitempty"`
RemoteWrite RemoteWriteConfig `yaml:"remote_write,omitempty" doc:"description=Remote-write configuration to send rule samples to a Prometheus remote-write endpoint."`
Evaluation EvaluationConfig `yaml:"evaluation,omitempty" doc:"description=Configuration for rule evaluation."`
}
func (c *Config) RegisterFlags(f *flag.FlagSet) {
@ -29,6 +31,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.RemoteWrite.RegisterFlags(f)
c.WAL.RegisterFlags(f)
c.WALCleaner.RegisterFlags(f)
c.Evaluation.RegisterFlags(f)
// TODO(owen-d, 3.0.0): remove deprecated experimental prefix in Cortex if they'll accept it.
f.BoolVar(&c.Config.EnableAPI, "ruler.enable-api", true, "Enable the ruler API.")

@ -0,0 +1,36 @@
package ruler
import (
"context"
"flag"
"fmt"
"strings"
"time"
"github.com/grafana/loki/pkg/logqlmodel"
)
// Evaluator is the interface that must be satisfied in order to accept rule evaluations from the Ruler.
type Evaluator interface {
// Eval evaluates the given rule and returns the result.
Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error)
}
type EvaluationConfig struct {
Mode string `yaml:"mode,omitempty"`
QueryFrontend QueryFrontendConfig `yaml:"query_frontend,omitempty"`
}
func (c *EvaluationConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.Mode, "ruler.evaluation.mode", EvalModeLocal, "The evaluation mode for the ruler. Can be either 'local' or 'remote'. If set to 'local', the ruler will evaluate rules locally. If set to 'remote', the ruler will evaluate rules remotely. If unset, the ruler will evaluate rules locally.")
c.QueryFrontend.RegisterFlags(f)
}
func (c *EvaluationConfig) Validate() error {
if c.Mode != EvalModeLocal && c.Mode != EvalModeRemote {
return fmt.Errorf("invalid evaluation mode: %s. Acceptable modes are: %s", c.Mode, strings.Join([]string{EvalModeLocal, EvalModeRemote}, ", "))
}
return nil
}

@ -0,0 +1,53 @@
package ruler
import (
"context"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
)
const EvalModeLocal = "local"
type LocalEvaluator struct {
cfg *EvaluationConfig
engine *logql.Engine
logger log.Logger
}
func NewLocalEvaluator(cfg *EvaluationConfig, engine *logql.Engine, logger log.Logger) (*LocalEvaluator, error) {
if cfg == nil {
return nil, fmt.Errorf("given config is nil")
}
if engine == nil {
return nil, fmt.Errorf("given engine is nil")
}
return &LocalEvaluator{cfg: cfg, engine: engine, logger: logger}, nil
}
func (l *LocalEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
params := logql.NewLiteralParams(
qs,
now,
now,
0,
0,
logproto.FORWARD,
0,
nil,
)
q := l.engine.Query(params)
res, err := q.Exec(ctx)
if err != nil {
return nil, err
}
return &res, nil
}

@ -0,0 +1,259 @@
package ruler
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/grafana/mimir/pull/1536/
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"net/textproto"
"net/url"
"strconv"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/crypto/tls"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/querier/series"
"github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/spanlogger"
)
const (
keepAlive = time.Second * 10
keepAliveTimeout = time.Second * 5
serviceConfig = `{"loadBalancingPolicy": "round_robin"}`
queryEndpointPath = "/loki/api/v1/query"
mimeTypeFormPost = "application/x-www-form-urlencoded"
)
const EvalModeRemote = "remote"
var userAgent = fmt.Sprintf("loki-ruler/%s", build.Version)
type RemoteEvaluator struct {
rq *remoteQuerier
logger log.Logger
}
func NewRemoteEvaluator(cfg *EvaluationConfig, logger log.Logger) (*RemoteEvaluator, error) {
qfClient, err := dialQueryFrontend(cfg.QueryFrontend)
if err != nil {
return nil, fmt.Errorf("failed to dial query frontend for remote rule evaluation: %w", err)
}
return &RemoteEvaluator{
rq: newRemoteQuerier(qfClient, logger, WithOrgIDMiddleware),
logger: logger,
}, nil
}
func (r *RemoteEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
res, err := r.rq.Query(ctx, qs, now)
if err != nil {
return nil, fmt.Errorf("failed to perform remote evaluation of query %q: %w", qs, err)
}
return res, err
}
// dialQueryFrontend creates and initializes a new httpgrpc.HTTPClient taking a QueryFrontendConfig configuration.
func dialQueryFrontend(cfg QueryFrontendConfig) (httpgrpc.HTTPClient, error) {
tlsDialOptions, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled)
if err != nil {
return nil, err
}
dialOptions := append(
[]grpc.DialOption{
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: keepAlive,
Timeout: keepAliveTimeout,
PermitWithoutStream: true,
},
),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
),
),
grpc.WithDefaultServiceConfig(serviceConfig),
},
tlsDialOptions...,
)
conn, err := grpc.Dial(cfg.Address, dialOptions...)
if err != nil {
return nil, err
}
return httpgrpc.NewHTTPClient(conn), nil
}
// Middleware provides a mechanism to inspect outgoing remote querier requests.
type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// remoteQuerier executes read operations against a httpgrpc.HTTPClient.
type remoteQuerier struct {
client httpgrpc.HTTPClient
middlewares []Middleware
logger log.Logger
}
// newRemoteQuerier creates and initializes a new remoteQuerier instance.
func newRemoteQuerier(
client httpgrpc.HTTPClient,
logger log.Logger,
middlewares ...Middleware,
) *remoteQuerier {
return &remoteQuerier{
client: client,
middlewares: middlewares,
logger: logger,
}
}
// Query performs a query for the given time.
func (q *remoteQuerier) Query(ctx context.Context, qs string, t time.Time) (*logqlmodel.Result, error) {
logger, ctx := spanlogger.NewWithLogger(ctx, q.logger, "ruler.remoteEvaluation.Query")
defer logger.Span.Finish()
return q.query(ctx, qs, t, logger)
}
func (q *remoteQuerier) query(ctx context.Context, query string, ts time.Time, logger log.Logger) (*logqlmodel.Result, error) {
args := make(url.Values)
args.Set("query", query)
args.Set("direction", "forward")
if !ts.IsZero() {
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())
hash := logql.HashedQuery(query)
req := httpgrpc.HTTPRequest{
Method: http.MethodPost,
Url: queryEndpointPath,
Body: body,
Headers: []*httpgrpc.Header{
{Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{userAgent}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeFormPost}},
{Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}},
},
}
for _, mdw := range q.middlewares {
if err := mdw(ctx, &req); err != nil {
return nil, err
}
}
start := time.Now()
resp, err := q.client.Handle(ctx, &req)
if err != nil {
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression", "err", err, "query_hash", hash, "qs", query, "ts", ts, "response_time", time.Since(start).Seconds())
return nil, err
}
if resp.Code/100 != 2 {
return nil, fmt.Errorf("unexpected response status code %d: %s", resp.Code, string(resp.Body))
}
level.Debug(logger).Log("msg", "query expression successfully evaluated", "query_hash", hash, "qs", query, "ts", ts, "response_time", time.Since(start).Seconds())
return decodeResponse(resp)
}
func decodeResponse(resp *httpgrpc.HTTPResponse) (*logqlmodel.Result, error) {
var decoded loghttp.QueryResponse
if err := json.NewDecoder(bytes.NewReader(resp.Body)).Decode(&decoded); err != nil {
return nil, err
}
if decoded.Status == "error" {
return nil, fmt.Errorf("query response error: %s", decoded.Status)
}
switch decoded.Data.ResultType {
case loghttp.ResultTypeVector:
var res promql.Vector
vec := decoded.Data.Result.(loghttp.Vector)
for _, s := range vec {
res = append(res, promql.Sample{
Metric: series.MetricToLabels(s.Metric),
Point: promql.Point{V: float64(s.Value), T: int64(s.Timestamp)},
})
}
return &logqlmodel.Result{
Statistics: decoded.Data.Statistics,
Data: res,
}, nil
case loghttp.ResultTypeScalar:
var res promql.Scalar
scalar := decoded.Data.Result.(loghttp.Scalar)
res.T = scalar.Timestamp.Unix()
res.V = float64(scalar.Value)
return &logqlmodel.Result{
Statistics: decoded.Data.Statistics,
Data: res,
}, nil
default:
return nil, fmt.Errorf("unsupported result type %s", decoded.Data.ResultType)
}
}
// WithOrgIDMiddleware attaches 'X-Scope-OrgID' header value to the outgoing request by inspecting the passed context.
// In case the expression to evaluate corresponds to a federated rule, the ExtractTenantIDs function will take care
// of normalizing and concatenating source tenants by separating them with a '|' character.
func WithOrgIDMiddleware(ctx context.Context, req *httpgrpc.HTTPRequest) error {
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
req.Headers = append(req.Headers, &httpgrpc.Header{
Key: textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName),
Values: []string{orgID},
})
return nil
}
// QueryFrontendConfig defines query-frontend transport configuration.
type QueryFrontendConfig struct {
// The address of the remote querier to connect to.
Address string `yaml:"address"`
// TLSEnabled tells whether TLS should be used to establish remote connection.
TLSEnabled bool `yaml:"tls_enabled"`
// TLS is the config for client TLS.
TLS tls.ClientConfig `yaml:",inline"`
}
func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.Address, "ruler.evaluation.query-frontend.address", "", "GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) to enable client side load balancing.")
f.BoolVar(&c.TLSEnabled, "ruler.evaluation.query-frontend.tls-enabled", false, "Set to true if query-frontend connection requires TLS.")
c.TLS.RegisterFlagsWithPrefix("ruler.evaluation.query-frontend", f)
}

@ -6,12 +6,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/grafana/loki/pkg/logql"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulestore"
)
func NewRuler(cfg Config, engine *logql.Engine, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*ruler.Ruler, error) {
func NewRuler(cfg Config, evaluator Evaluator, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*ruler.Ruler, error) {
// For backward compatibility, client and clients are defined in the remote_write config.
// When both are present, an error is thrown.
if len(cfg.RemoteWrite.Clients) > 0 && cfg.RemoteWrite.Client != nil {
@ -28,7 +27,7 @@ func NewRuler(cfg Config, engine *logql.Engine, reg prometheus.Registerer, logge
mgr, err := ruler.NewDefaultMultiTenantManager(
cfg.Config,
MultiTenantRuleManager(cfg, engine, limits, logger, reg),
MultiTenantRuleManager(cfg, evaluator, limits, logger, reg),
reg,
logger,
limits,

@ -49,6 +49,29 @@ ingester:
chunk_block_size: 262144
flush_op_timeout: 10s
ruler:
enable_api: true
wal:
dir: /tmp/ruler-wal
evaluation:
mode: remote
query_frontend:
address: dns:///loki-read:9095
storage:
type: local
local:
directory: /etc/loki/rules
rule_path: /tmp/prom-rules
remote_write:
enabled: true
clients:
local:
url: http://prometheus:9090/api/v1/write
queue_config:
# send immediately as soon as a sample is generated
capacity: 1
batch_send_deadline: 0s
schema_config:
configs:
- from: 2020-08-01

@ -52,6 +52,7 @@ services:
[
'--log.level=debug',
'--config.file=/etc/prometheus/prometheus.yml',
'--enable-feature=remote-write-receiver',
'--query.lookback-delta=30s'
]
networks:
@ -126,7 +127,7 @@ services:
# - SYS_PTRACE
#security_opt:
# - apparmor=unconfined
command: "-config.file=/etc/loki/loki.yaml -target=read"
command: "-config.file=/etc/loki/loki.yaml -target=read -legacy-read-mode=false"
networks:
- loki
restart: always
@ -156,6 +157,28 @@ services:
mode: replicated
replicas: 3
loki-backend:
image: grafana/loki:2.7.3
volumes:
- ./config:/etc/loki/
- ./rules:/etc/loki/rules
ports:
- "3100"
- "7946"
# uncomment to use interactive debugging
#- "60000-60002:40000" # makes the replicas available on ports 50000, 50001, 50002
# cap_add:
# - SYS_PTRACE
# security_opt:
# - apparmor=unconfined
command: "-config.file=/etc/loki/loki.yaml -target=backend -legacy-read-mode=false -log.level=debug"
networks:
- loki
restart: always
deploy:
mode: replicated
replicas: 3
# alertmanager to enable receiving alerts
alertmanager:
image: prom/alertmanager:v0.23.0

@ -0,0 +1,20 @@
groups:
- name: Sample Rule Group
interval: 5s
rules:
- record: generated_logs:rate1m
expr: sum by (http_method) (rate({job="generated-logs"}[1m]))
labels:
source: "recording rule"
- record: scalar
expr: 10
labels:
source: "static"
- alert: NoGeneratedLogs
expr: absent_over_time({job="generated-logs"}[1m])
labels:
source: "alerting rule"
- alert: AlwaysFiring
expr: absent_over_time({job="blah"}[1m])
labels:
source: "alerting rule"
Loading…
Cancel
Save