Support frontend V2 with query scheduler. (#4071)

* Support Cortex Frontend V2.

* Format code.

* Start query scheduler in Docker compose.

* Model query flow in sequence diagram.

* Model calls to downstreamer.

* Move diagram to docs.

* Embed generated SVG.

* Descripe calls.

* Launch Cortex query scheduler.

* Disable sharding for now.

* Log which framework is used.

* Configure querier to use scheduler.

* Remove unwanted docs.

* Enable query splitting again.

* Document scheduler address.

* Document new scheduler options for frontend as well.

* Remove unused querier option.

* Use new query-scheduler module in Loki.

* Remove unused option.

* Document `max_queriers_per_tenant`.
pull/4109/head
Karsten Jeschkies 4 years ago committed by GitHub
parent f9b0ef33c0
commit c3aaebe509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      docs/sources/configuration/_index.md
  2. 16
      pkg/loki/loki.go
  3. 39
      pkg/loki/modules.go
  4. 3
      pkg/lokifrontend/config.go
  5. 8
      pkg/validation/limits.go
  6. 4
      tools/dev/loki-boltdb-storage-s3/config/datasource.yaml
  7. 2
      tools/dev/loki-boltdb-storage-s3/config/loki.yaml
  8. 37
      tools/dev/loki-boltdb-storage-s3/docker-compose.yml

@ -307,6 +307,19 @@ The query_frontend_config configures the Loki query-frontend.
# URL of querier for tail proxy.
# CLI flag: -frontend.tail-proxy-url
[tail_proxy_url: <string> | default = ""]
# DNS hostname used for finding query-schedulers.
# CLI flag: -frontend.scheduler-address
[scheduler_address: <string> | default = ""]
# How often to resolve the scheduler-address, in order to look for new
# query-scheduler instances.
# CLI flag: -frontend.scheduler-dns-lookup-period
[scheduler_dns_lookup_period: <duration> | default = 10s]
# Number of concurrent workers forwarding queries to single query-scheduler.
# CLI flag: -frontend.scheduler-worker-concurrency
[scheduler_worker_concurrency: <int> | default = 5]
```
## queryrange_config
@ -763,6 +776,10 @@ The `frontend_worker_config` configures the worker - running within the Loki que
# The CLI flags prefix for this block config is: querier.frontend-client
[grpc_client_config: <grpc_client_config>]
# DNS hostname used for finding query-schedulers.
# CLI flag: -querier.scheduler-address
[scheduler_address: <string> | default = ""]
```
## ingester_client_config
@ -1866,6 +1883,16 @@ logs in Loki.
# Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness_per_query: <duration> | default = 1m]
# Maximum number of queriers that can handle requests for a single tenant. If
# set to 0 or value higher than number of available queriers, *all* queriers
# will handle requests for the tenant. Each frontend (or query-scheduler, if
# used) will select the same set of queriers for the same tenant (given that all
# queriers are connected to all frontends / query-schedulers). This option only
# works with queriers connecting to the query-frontend / query-scheduler, not
# when using downstream URL.
# CLI flag: -frontend.max-queriers-per-tenant
[max_queriers_per_tenant: <int> | default = 0]
```
### grpc_client_config

@ -7,7 +7,6 @@ import (
"fmt"
"net/http"
frontend "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/felixge/fgprof"
@ -25,6 +24,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
@ -73,6 +73,7 @@ type Config struct {
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
}
// RegisterFlags registers flag.
@ -104,6 +105,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.MemberlistKV.RegisterFlags(f)
c.Tracing.RegisterFlags(f)
c.CompactorConfig.RegisterFlags(f)
c.QueryScheduler.RegisterFlags(f)
}
// Clone takes advantage of pass-by-value semantics to return a distinct *Config.
@ -135,6 +137,9 @@ func (c *Config) Validate() error {
if err := c.Ingester.Validate(); err != nil {
return errors.Wrap(err, "invalid ingester config")
}
if err := c.Worker.Validate(util_log.Logger); err != nil {
return errors.Wrap(err, "invalid storage config")
}
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid boltdb-shipper config")
}
@ -155,6 +160,11 @@ func (c *Config) isModuleEnabled(m string) bool {
return util.StringsContain(c.Target, m)
}
type Frontend interface {
services.Service
CheckReady(_ context.Context) error
}
// Loki is the root datastructure for Loki.
type Loki struct {
Cfg Config
@ -173,7 +183,7 @@ type Loki struct {
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *chunk.TableManager
frontend *frontend.Frontend
frontend Frontend
ruler *cortex_ruler.Ruler
RulerStorage rulestore.RuleStore
rulerAPI *cortex_ruler.API
@ -383,6 +393,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(All, nil)
// Add dependencies
@ -396,6 +407,7 @@ func (t *Loki) setupModuleManager() error {
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {Server, Overrides},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server, Overrides},

@ -14,6 +14,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
@ -21,6 +22,8 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
@ -78,6 +81,7 @@ const (
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
IndexGateway string = "index-gateway"
QueryScheduler string = "query-scheduler"
All string = "all"
)
@ -193,10 +197,8 @@ func (t *Loki) initQuerier() (services.Service, error) {
err error
)
// NewQuerierWorker now expects Frontend (or Scheduler) address to be set. Loki only supports Frontend for now.
if t.Cfg.Worker.FrontendAddress != "" {
// In case someone set scheduler address, we ignore it.
t.Cfg.Worker.SchedulerAddress = ""
// NewQuerierWorker now expects Frontend (or Scheduler) address to be set.
if t.Cfg.Worker.FrontendAddress != "" || t.Cfg.Worker.SchedulerAddress != "" {
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer)
@ -416,18 +418,26 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))
roundTripper, frontendV1, _, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
// Don't set FrontendV2 field to make sure that only frontendV1 can be initialized.
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
Handler: t.Cfg.Frontend.Handler,
FrontendV1: t.Cfg.Frontend.FrontendV1,
FrontendV2: t.Cfg.Frontend.FrontendV2,
DownstreamURL: t.Cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
t.frontend = frontendV1
if t.frontend != nil {
frontendv1pb.RegisterFrontendServer(t.Server.GRPC, t.frontend)
if frontendV1 != nil {
frontendv1pb.RegisterFrontendServer(t.Server.GRPC, frontendV1)
t.frontend = frontendV1
level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v1")
} else if frontendV2 != nil {
frontendv2pb.RegisterFrontendForQuerierServer(t.Server.GRPC, frontendV2)
t.frontend = frontendV2
level.Debug(util_log.Logger).Log("msg", "using query frontend", "version", "v2")
} else {
level.Debug(util_log.Logger).Log("msg", "no query frontend configured")
}
roundTripper = t.QueryFrontEndTripperware(roundTripper)
@ -643,6 +653,17 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return gateway, nil
}
func (t *Loki) initQueryScheduler() (services.Service, error) {
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
return s, nil
}
func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")

@ -5,11 +5,13 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
v1 "github.com/cortexproject/cortex/pkg/frontend/v1"
v2 "github.com/cortexproject/cortex/pkg/frontend/v2"
)
type Config struct {
Handler transport.HandlerConfig `yaml:",inline"`
FrontendV1 v1.Config `yaml:",inline"`
FrontendV2 v2.Config `yaml:",inline"`
CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
@ -21,6 +23,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Handler.RegisterFlags(f)
cfg.FrontendV1.RegisterFlags(f)
cfg.FrontendV2.RegisterFlags(f)
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")

@ -56,6 +56,7 @@ type Limits struct {
MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"`
MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"`
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"`
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
@ -124,6 +125,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MaxCacheFreshness.Set("1m")
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")
f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.")
_ = l.RulerEvaluationDelay.Set("0s")
f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.")
@ -296,6 +299,11 @@ func (o *Overrides) MaxQuerySeries(userID string) int {
return o.getOverridesForUser(userID).MaxQuerySeries
}
// MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user.
func (o *Overrides) MaxQueriersPerUser(userID string) int {
return o.getOverridesForUser(userID).MaxQueriersPerTenant
}
// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {

@ -1,5 +1,9 @@
apiVersion: 1
datasources:
- name: Jaeger
type: jaeger
access: proxy
url: http://jaeger:16686
- name: Loki
type: loki
access: proxy

@ -27,10 +27,10 @@ frontend:
log_queries_longer_than: 5s
max_outstanding_per_tenant: 512
frontend_worker:
frontend_address: query-frontend:9007
grpc_client_config:
max_send_msg_size: 1.048576e+08
parallelism: 6
scheduler_address: query-scheduler:9009
ingester:
chunk_block_size: 262144
chunk_encoding: snappy

@ -117,11 +117,12 @@ services:
context: .
dockerfile: dev.dockerfile
image: loki
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./loki --listen=:18004 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/loki.yaml -target=querier -server.http-listen-port=8004 -server.grpc-listen-port=9004"]
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./loki --listen=:18004 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/loki.yaml -target=querier -server.http-listen-port=8004 -server.grpc-listen-port=9004 -querier.scheduler-address=query-scheduler:9009"]
depends_on:
- consul
- minio
- query-frontend
- query-scheduler
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
@ -189,10 +190,11 @@ services:
context: .
dockerfile: dev.dockerfile
image: loki
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./loki --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/loki.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -log.level=debug"]
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./loki --listen=:18007 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/loki.yaml -target=query-frontend -server.http-listen-port=8007 -server.grpc-listen-port=9007 -frontend.scheduler-address=query-scheduler:9009 -log.level=debug"]
depends_on:
- consul
- minio
- query-scheduler
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
@ -205,6 +207,29 @@ services:
volumes:
- ./config:/loki/config
query-scheduler:
logging:
<<: *logging
build:
context: .
dockerfile: dev.dockerfile
image: loki
command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./loki --listen=:18009 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/loki.yaml -target=query-scheduler -server.http-listen-port=8009 -server.grpc-listen-port=9009 -log.level=debug"]
depends_on:
- consul
- minio
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
- JAEGER_TAGS=app=query-scheduler
- JAEGER_SAMPLER_TYPE=const
- JAEGER_SAMPLER_PARAM=1
ports:
- 8009:8009
- 18009:18009
volumes:
- ./config:/loki/config
grafana:
logging:
<<: *logging
@ -218,3 +243,11 @@ services:
- 3000:3000
volumes:
- ./config/datasource.yaml:/etc/config/grafana/provisioning/datasources/ds.yaml
log-gen:
logging:
<<: *logging
image: mingrammer/flog
command: ["-f", "json", "-l", "-s", "1s"]
depends_on:
- distributor

Loading…
Cancel
Save