Loki: Add a ring to the query scheduler to allow discovery via the ring as an alternative to DNS (#4424)

* Fork the frontend and scheduler so we can add support for discovery via the ring.

* Add a ring to the query scheduler which is then used by the frontend worker and querier workers to find the scheduler address as an alternative to using DNS

* update some of the forked code to use dskit grpcclient and grpcutil since #4312 was merged after the original fork was created.

* remove query scheduler dependency from querier

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* migrate logging package and make linter happy

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* add SafeReadRing for instances when scheduler is not enabled on same instance

* Doc changes from code review

A few doc fixes

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* go mod tidy on go-kit dep

* update changelog

* remove ReadRing in favor of SafeReadRing

Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>
Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
pull/4525/head
Ed Welch 4 years ago committed by GitHub
parent 2427fab32d
commit 2b5f3000b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 112
      docs/sources/configuration/_index.md
  3. 2
      go.mod
  4. 11
      pkg/loki/config_wrapper.go
  5. 9
      pkg/loki/loki.go
  6. 25
      pkg/loki/modules.go
  7. 6
      pkg/lokifrontend/config.go
  8. 75
      pkg/lokifrontend/frontend/config.go
  9. 41
      pkg/lokifrontend/frontend/downstream_roundtripper.go
  10. 254
      pkg/lokifrontend/frontend/transport/handler.go
  11. 58
      pkg/lokifrontend/frontend/transport/roundtripper.go
  12. 353
      pkg/lokifrontend/frontend/v1/frontend.go
  13. 320
      pkg/lokifrontend/frontend/v2/frontend.go
  14. 342
      pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
  15. 147
      pkg/querier/worker/frontend_processor.go
  16. 86
      pkg/querier/worker/processor_manager.go
  17. 234
      pkg/querier/worker/scheduler_processor.go
  18. 284
      pkg/querier/worker/worker.go
  19. 20
      pkg/querier/worker_service.go
  20. 3
      pkg/querier/worker_service_test.go
  21. 673
      pkg/scheduler/scheduler.go
  22. 110
      pkg/scheduler/scheduler_ring.go
  23. 40
      pkg/util/httpgrpc/carrier.go
  24. 126
      pkg/util/ring_watcher.go

@ -5,6 +5,7 @@
* [4443](https://github.com/grafana/loki/pull/4443) **DylanGuedes**: Loki: Change how push API checks for contentType
* [4415](https://github.com/grafana/loki/pull/4415) **DylanGuedes**: Change default limits to common values
* [4473](https://github.com/grafana/loki/pull/4473) **trevorwhitney**: Config: add object storage configuration to common config
* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler
# 2.3.0 (2021/08/06)

@ -97,6 +97,10 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# just the querier.
[querier: <querier_config>]
# The query_scheduler block configures the Loki query scheduler.
# When configured it separates the tenant query queues from the query-frontend
[query_scheduler: <query_scheduler_config>]
# The query_frontend_config configures the Loki query-frontend.
[frontend: <query_frontend_config>]
@ -282,6 +286,106 @@ engine:
[max_look_back_period: <duration> | default = 30s]
```
## query_scheduler_config
The `query_scheduler_config` block configures the Loki query scheduler.
```yaml
# Maximum number of outstanding requests per tenant per query-scheduler.
# In-flight requests above this limit will fail with HTTP response status code
# 429.
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]
# This configures the gRPC client used to report errors back to the
# query-frontend.
[grpc_client_config: <grpc_client_config>]
# Set to true to have the query schedulers create and place themselves in a ring.
# If no frontend_address or scheduler_address are present
# anywhere else in the configuration, Loki will toggle this value to true.
[use_scheduler_ring: <boolean> | default = false]
# The hash ring configuration. This option is required only if use_scheduler_ring is true
scheduler_ring:
# The key-value store used to share the hash ring across multiple instances.
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -scheduler.ring.store
[store: <string> | default = "memberlist"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -scheduler.ring.prefix
[prefix: <string> | default = "schedulers/"]
# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: scheduler.ring
[consul: <consul_config>]
# The etcd_config configures the etcd client.
# The CLI flags prefix for this block config is: scheduler.ring
[etcd: <etcd_config>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -scheduler.ring.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -scheduler.ring.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -scheduler.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -scheduler.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Interval between heartbeats sent to the ring. 0 = disabled.
# CLI flag: -scheduler.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which store gateways are considered unhealthy
# within the ring. 0 = never (timeout disabled). This option needs be set both
# on the store-gateway and querier when running in microservices mode.
# CLI flag: -scheduler.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# File path where tokens are stored. If empty, tokens are neither stored at
# shutdown nor restored at startup.
# CLI flag: -scheduler.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]
# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -scheduler.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Name of network interface to read addresses from.
# CLI flag: -scheduler.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
# IP address to advertise in the ring.
# CLI flag: -scheduler.ring.instance-addr
[instance_addr: <list of string> | default = first from instance_interface_names]
# Port to advertise in the ring
# CLI flag: -scheduler.ring.instance-port
[instance_port: <list of string> | default = server.grpc-listen-port]
# Instance ID to register in the ring.
# CLI flag: -scheduler.ring.instance-id
[instance_id: <list of string> | default = os.Hostname()]
# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -scheduler.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
```
## query_frontend_config
The query_frontend_config configures the Loki query-frontend.
@ -315,8 +419,9 @@ The query_frontend_config configures the Loki query-frontend.
# How often to resolve the scheduler-address, in order to look for new
# query-scheduler instances.
# Also used to determine how often to poll the scheduler-ring for addresses if configured.
# CLI flag: -frontend.scheduler-dns-lookup-period
[scheduler_dns_lookup_period: <duration> | default = 10s]
[scheduler_dns_lookup_period: <duration> | default = 3s]
# Number of concurrent workers forwarding queries to single query-scheduler.
# CLI flag: -frontend.scheduler-worker-concurrency
@ -776,9 +881,10 @@ The `frontend_worker_config` configures the worker - running within the Loki que
# CLI flag: -querier.worker-parallelism
[parallelism: <int> | default = 10]
# How often to query DNS.
# How often to query the frontend_address DNS to resolve frontend addresses.
# Also used to determine how often to poll the scheduler-ring for addresses if configured.
# CLI flag: -querier.dns-lookup-period
[dns_lookup_duration: <duration> | default = 10s]
[dns_lookup_duration: <duration> | default = 3s]
# The CLI flags prefix for this block config is: querier.frontend-client
[grpc_client_config: <grpc_client_config>]

@ -31,6 +31,7 @@ require (
github.com/felixge/fgprof v0.9.1
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-kit/log v0.2.0
github.com/go-logfmt/logfmt v0.5.1
github.com/go-redis/redis/v8 v8.11.4
@ -147,7 +148,6 @@ require (
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-kit/kit v0.11.0 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-openapi/analysis v0.20.0 // indirect
github.com/go-openapi/errors v0.20.0 // indirect

@ -73,6 +73,14 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}
// If nobody has defined any frontend address or scheduler address
// we can default to using the query scheduler ring for scheduler discovery.
if r.Worker.FrontendAddress == "" &&
r.Worker.SchedulerAddress == "" &&
r.Frontend.FrontendV2.SchedulerAddress == "" {
r.QueryScheduler.UseSchedulerRing = true
}
applyMemberlistConfig(r)
applyStorageConfig(r, &defaults)
@ -80,7 +88,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}
// applyMemberlistConfig will change the default ingester, distributor, and ruler ring configurations to use memberlist
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.
// Since a user can still explicitly override a specific ring configuration (for example, use consul for the distributor),
@ -90,6 +98,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
}
}

@ -8,11 +8,9 @@ import (
"net/http"
cortex_tripper "github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/ruler/rulestore"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/fakeauth"
util_log "github.com/cortexproject/cortex/pkg/util/log"
@ -38,8 +36,10 @@ import (
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
@ -224,6 +224,7 @@ type Loki struct {
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware cortex_tripper.Tripperware
queryScheduler *scheduler.Scheduler
HTTPAuthMiddleware middleware.Interface
}
@ -435,13 +436,13 @@ func (t *Loki) setupModuleManager() error {
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {Server, Overrides},
QueryScheduler: {Server, Overrides, MemberlistKV},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server, Overrides},
IndexGateway: {Server},
IngesterQuerier: {Ring},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Ruler},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler},
}
// Add IngesterQuerier as a dependency for store when target is either ingester or querier.

@ -12,13 +12,10 @@ import (
"github.com/NYTimes/gziphandler"
"github.com/cortexproject/cortex/pkg/cortex"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/ring"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
@ -37,10 +34,13 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/lokifrontend/frontend"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/scheduler"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
@ -210,6 +210,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
SchedulerRing: t.queryScheduler.SafeReadRing(),
}
var queryHandlers = map[string]http.Handler{
@ -414,12 +415,20 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
combinedCfg := frontend.CombinedFrontendConfig{
Handler: t.Cfg.Frontend.Handler,
FrontendV1: t.Cfg.Frontend.FrontendV1,
FrontendV2: t.Cfg.Frontend.FrontendV2,
DownstreamURL: t.Cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
t.queryScheduler.SafeReadRing(),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
@ -660,6 +669,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
func (t *Loki) initQueryScheduler() (services.Service, error) {
// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.overrides, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
@ -667,6 +680,8 @@ func (t *Loki) initQueryScheduler() (services.Service, error) {
schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
t.Server.HTTP.Handle("/scheduler/ring", s)
t.queryScheduler = s
return s, nil
}

@ -3,9 +3,9 @@ package lokifrontend
import (
"flag"
"github.com/cortexproject/cortex/pkg/frontend/transport"
v1 "github.com/cortexproject/cortex/pkg/frontend/v1"
v2 "github.com/cortexproject/cortex/pkg/frontend/v2"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1"
v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2"
)
type Config struct {

@ -0,0 +1,75 @@
package frontend
import (
"flag"
"net/http"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
v1 "github.com/grafana/loki/pkg/lokifrontend/frontend/v1"
v2 "github.com/grafana/loki/pkg/lokifrontend/frontend/v2"
)
// This struct combines several configuration options together to preserve backwards compatibility.
type CombinedFrontendConfig struct {
Handler transport.HandlerConfig `yaml:",inline"`
FrontendV1 v1.Config `yaml:",inline"`
FrontendV2 v2.Config `yaml:",inline"`
DownstreamURL string `yaml:"downstream_url"`
}
func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
cfg.Handler.RegisterFlags(f)
cfg.FrontendV1.RegisterFlags(f)
cfg.FrontendV2.RegisterFlags(f)
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
}
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
// all if downstream Prometheus URL is used instead.
//
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg CombinedFrontendConfig, ring ring.ReadRing, limits v1.Limits, grpcListenPort int, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, *v2.Frontend, error) {
switch {
case cfg.DownstreamURL != "":
// If the user has specified a downstream Prometheus, then we should use that.
rt, err := NewDownstreamRoundTripper(cfg.DownstreamURL, http.DefaultTransport)
return rt, nil, nil, err
case ring != nil:
fallthrough
case cfg.FrontendV2.SchedulerAddress != "":
// If query-scheduler address is configured, use Frontend.
if cfg.FrontendV2.Addr == "" {
addr, err := util.GetFirstAddressOf(cfg.FrontendV2.InfNames)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "failed to get frontend address")
}
cfg.FrontendV2.Addr = addr
}
if cfg.FrontendV2.Port == 0 {
cfg.FrontendV2.Port = grpcListenPort
}
fr, err := v2.NewFrontend(cfg.FrontendV2, ring, log, reg)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err
default:
// No scheduler = use original frontend.
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
if err != nil {
return nil, nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
}
}

@ -0,0 +1,41 @@
package frontend
import (
"net/http"
"net/url"
"path"
"github.com/opentracing/opentracing-go"
)
// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
}
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}
return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
}
func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(r.Context())
if tracer != nil && span != nil {
carrier := opentracing.HTTPHeadersCarrier(r.Header)
err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier)
if err != nil {
return nil, err
}
}
r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
r.Host = ""
return d.transport.RoundTrip(r)
}

@ -0,0 +1,254 @@
package transport
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
ServiceTimingHeaderName = "Server-Timing"
)
var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
)
// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
}
func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.")
f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query.")
}
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}
// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) http.Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
}
if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})
h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})
h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})
h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}
return h
}
func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
)
// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}
defer func() {
_ = r.Body.Close()
}()
// Buffer the body for later use to track slow queries.
var buf bytes.Buffer
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
r.Body = ioutil.NopCloser(io.TeeReader(r.Body, &buf))
startTime := time.Now()
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)
if err != nil {
writeError(w, err)
return
}
hs := w.Header()
for h, vs := range resp.Header {
hs[h] = vs
}
if f.cfg.QueryStatsEnabled {
writeServiceTimingHeader(queryResponseTime, hs, stats)
}
w.WriteHeader(resp.StatusCode)
// we don't check for copy error as there is no much we can do at this point
_, _ = io.Copy(w, resp.Body)
// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
queryString = f.parseRequestQueryString(r, buf)
}
if shouldReportSlowQuery {
f.reportSlowQuery(r, queryString, queryResponseTime)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
}
}
// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) {
logMessage := append([]interface{}{
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"time_taken", queryResponseTime.String(),
}, formatQueryString(queryString)...)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())
// Log stats.
logMessage := append([]interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
// Use previously buffered body.
r.Body = ioutil.NopCloser(&bodyBuf)
// Ensure the form has been parsed so all the parameters are present
err := r.ParseForm()
if err != nil {
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "unable to parse request form", "err", err)
return nil
}
return r.Form
}
func formatQueryString(queryString url.Values) (fields []interface{}) {
for k, v := range queryString {
fields = append(fields, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
}
return fields
}
func writeError(w http.ResponseWriter, err error) {
switch err {
case context.Canceled:
err = errCanceled
case context.DeadlineExceeded:
err = errDeadlineExceeded
default:
if util.IsRequestBodyTooLarge(err) {
err = errRequestEntityTooLarge
}
}
server.WriteError(w, err)
}
func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
if stats != nil {
parts := make([]string, 0)
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}
func statsValue(name string, d time.Duration) string {
durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
return name + ";dur=" + durationInMs
}

@ -0,0 +1,58 @@
package transport
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
)
// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
return &grpcRoundTripperAdapter{roundTripper: r}
}
// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper
type grpcRoundTripperAdapter struct {
roundTripper GrpcRoundTripper
}
type buffer struct {
buff []byte
io.ReadCloser
}
func (b *buffer) Bytes() []byte {
return b.buff
}
func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
req, err := server.HTTPRequest(r)
if err != nil {
return nil, err
}
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
if err != nil {
return nil, err
}
httpResp := &http.Response{
StatusCode: int(resp.Code),
Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))},
Header: http.Header{},
ContentLength: int64(len(resp.Body)),
}
for _, h := range resp.Headers {
httpResp.Header[h.Key] = h.Values
}
return httpResp, nil
}

@ -0,0 +1,353 @@
package v1
import (
"context"
"flag"
"fmt"
"net/http"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)
var (
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
)
// Config for a Frontend.
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
}
type Limits interface {
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int
}
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
services.Service
cfg Config
log log.Logger
limits Limits
requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
// Metrics.
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
numClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
}
type request struct {
enqueueTime time.Time
queueSpan opentracing.Span
originalCtx context.Context
request *httpgrpc.HTTPRequest
err chan error
response chan *httpgrpc.HTTPResponse
}
// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
limits: limits,
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"}),
discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"}),
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Buckets: prometheus.DefBuckets,
}),
}
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)
var err error
f.subservices, err = services.NewManager(f.requestQueue, f.activeUsers)
if err != nil {
return nil, err
}
f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_connected_clients",
Help: "Number of worker clients currently connected to the frontend.",
}, f.requestQueue.GetConnectedQuerierWorkersMetric)
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
return f, nil
}
func (f *Frontend) starting(ctx context.Context) error {
f.subservicesWatcher.WatchManager(f.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
return errors.Wrap(err, "unable to start frontend subservices")
}
return nil
}
func (f *Frontend) running(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-f.subservicesWatcher.Chan():
return errors.Wrap(err, "frontend subservice failed")
}
}
}
func (f *Frontend) stopping(_ error) error {
// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
}
func (f *Frontend) cleanupInactiveUserMetrics(user string) {
f.queueLength.DeleteLabelValues(user)
f.discardedRequests.DeleteLabelValues(user)
}
// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := (*lokigrpc.HeadersCarrier)(req)
err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier)
if err != nil {
return nil, err
}
}
request := request{
request: req,
originalCtx: ctx,
// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *httpgrpc.HTTPResponse, 1),
}
if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-request.response:
return resp, nil
case err := <-request.err:
return nil, err
}
}
// Process allows backends to pull requests from the frontend.
func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
querierID, err := getQuerierID(server)
if err != nil {
return err
}
f.requestQueue.RegisterQuerierConnection(querierID)
defer f.requestQueue.UnregisterQuerierConnection(querierID)
// If the downstream request(from querier -> frontend) is cancelled,
// we need to ping the condition variable to unblock getNextRequestForQuerier.
// Ideally we'd have ctx aware condition variables...
go func() {
<-server.Context().Done()
f.requestQueue.QuerierDisconnecting()
}()
lastUserIndex := queue.FirstUser()
for {
reqWrapper, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, querierID)
if err != nil {
return err
}
lastUserIndex = idx
req := reqWrapper.(*request)
f.queueDuration.Observe(time.Since(req.enqueueTime).Seconds())
req.queueSpan.Finish()
/*
We want to dequeue the next unexpired request from the chosen tenant queue.
The chance of choosing a particular tenant for dequeueing is (1/active_tenants).
This is problematic under load, especially with other middleware enabled such as
querier.split-by-interval, where one request may fan out into many.
If expired requests aren't exhausted before checking another tenant, it would take
n_active_tenants * n_expired_requests_at_front_of_queue requests being processed
before an active request was handled for the tenant in question.
If this tenant meanwhile continued to queue requests,
it's possible that it's own queue would perpetually contain only expired requests.
*/
if req.originalCtx.Err() != nil {
lastUserIndex = lastUserIndex.ReuseLastUser()
continue
}
// Handle the stream sending & receiving on a goroutine so we can
// monitoring the contexts in a select and cancel things appropriately.
resps := make(chan *frontendv1pb.ClientToFrontend, 1)
errs := make(chan error, 1)
go func() {
err = server.Send(&frontendv1pb.FrontendToClient{
Type: frontendv1pb.HTTP_REQUEST,
HttpRequest: req.request,
StatsEnabled: stats.IsEnabled(req.originalCtx),
})
if err != nil {
errs <- err
return
}
resp, err := server.Recv()
if err != nil {
errs <- err
return
}
resps <- resp
}()
select {
// If the upstream request is cancelled, we need to cancel the
// downstream req. Only way we can do that is to close the stream.
// The worker client is expecting this semantics.
case <-req.originalCtx.Done():
return req.originalCtx.Err()
// Is there was an error handling this request due to network IO,
// then error out this upstream request _and_ stream.
case err := <-errs:
req.err <- err
return err
// Happy path: merge the stats and propagate the response.
case resp := <-resps:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(req.originalCtx)
stats.Merge(resp.Stats) // Safe if stats is nil.
}
req.response <- resp.HttpResponse
}
}
}
func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())
f.requestQueue.NotifyQuerierShutdown(req.GetClientID())
return &frontendv1pb.NotifyClientShutdownResponse{}, nil
}
func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) {
err := server.Send(&frontendv1pb.FrontendToClient{
Type: frontendv1pb.GET_ID,
// Old queriers don't support GET_ID, and will try to use the request.
// To avoid confusing them, include dummy request.
HttpRequest: &httpgrpc.HTTPRequest{
Method: "GET",
Url: "/invalid_request_sent_by_frontend",
},
})
if err != nil {
return "", err
}
resp, err := server.Recv()
// Old queriers will return empty string, which is fine. All old queriers will be
// treated as single querier with lot of connections.
// (Note: if resp is nil, GetClientID() returns "")
return resp.GetClientID(), err
}
func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return err
}
now := time.Now()
req.enqueueTime = now
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")
// aggregate the max queriers limit in the case of a multi tenant query
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)
err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers, nil)
if err == queue.ErrTooManyRequests {
return errTooManyRequest
}
return err
}
// CheckReady determines if the query frontend is ready. Function parameters/return
// chosen to match the same method in the ingester
func (f *Frontend) CheckReady(_ context.Context) error {
// if we have more than one querier connected we will consider ourselves ready
connectedClients := f.requestQueue.GetConnectedQuerierWorkersMetric()
if connectedClients > 0 {
return nil
}
msg := fmt.Sprintf("not ready: number of queriers connected to query-frontend is %d", int64(connectedClients))
level.Info(f.log).Log("msg", msg)
return errors.New(msg)
}

@ -0,0 +1,320 @@
package v2
import (
"context"
"flag"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)
// Config for a Frontend.
type Config struct {
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
// Used to find local IP address, that is sent to scheduler and querier-worker.
InfNames []string `yaml:"instance_interface_names"`
// If set, address is not computed from interfaces.
Addr string `yaml:"address" doc:"hidden"`
Port int `doc:"hidden"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.")
f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.")
cfg.InfNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.")
f.StringVar(&cfg.Addr, "frontend.instance-addr", "", "IP address to advertise to querier (via scheduler) (resolved via interfaces by default).")
f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f)
}
// Frontend implements GrpcRoundTripper. It queues HTTP requests,
// dispatches them to backends via gRPC, and handles retries for requests which failed.
type Frontend struct {
services.Service
cfg Config
log log.Logger
lastQueryID atomic.Uint64
// frontend workers will read from this channel, and send request to scheduler.
requestsCh chan *frontendRequest
schedulerWorkers *frontendSchedulerWorkers
requests *requestsInProgress
}
type frontendRequest struct {
queryID uint64
request *httpgrpc.HTTPRequest
userID string
statsEnabled bool
cancel context.CancelFunc
enqueue chan enqueueResult
response chan *frontendv2pb.QueryResultRequest
}
type enqueueStatus int
const (
// Sent to scheduler successfully, and frontend should wait for response now.
waitForResponse enqueueStatus = iota
// Failed to forward request to scheduler, frontend will try again.
failed
)
type enqueueResult struct {
status enqueueStatus
cancelCh chan<- uint64 // Channel that can be used for request cancellation. If nil, cancellation is not possible.
}
// NewFrontend creates a new frontend.
func NewFrontend(cfg Config, ring ring.ReadRing, log log.Logger, reg prometheus.Registerer) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)
schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), ring, requestsCh, log)
if err != nil {
return nil, err
}
f := &Frontend{
cfg: cfg,
log: log,
requestsCh: requestsCh,
schedulerWorkers: schedulerWorkers,
requests: newRequestsInProgress(),
}
// Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results
// between different queries. Note that frontend verifies the user, so it cannot leak results between tenants.
// This isn't perfect, but better than nothing.
f.lastQueryID.Store(rand.Uint64())
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queries_in_progress",
Help: "Number of queries in progress handled by this frontend.",
}, func() float64 {
return float64(f.requests.count())
})
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_connected_schedulers",
Help: "Number of schedulers this frontend is connected to.",
}, func() float64 {
return float64(f.schedulerWorkers.getWorkersCount())
})
f.Service = services.NewIdleService(f.starting, f.stopping)
return f, nil
}
func (f *Frontend) starting(ctx context.Context) error {
return errors.Wrap(services.StartAndAwaitRunning(ctx, f.schedulerWorkers), "failed to start frontend scheduler workers")
}
func (f *Frontend) stopping(_ error) error {
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
}
// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
carrier := (*lokigrpc.HeadersCarrier)(req)
if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil {
return nil, err
}
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
freq := &frontendRequest{
queryID: f.lastQueryID.Inc(),
request: req,
userID: userID,
statsEnabled: stats.IsEnabled(ctx),
cancel: cancel,
// Buffer of 1 to ensure response or error can be written to the channel
// even if this goroutine goes away due to client context cancellation.
enqueue: make(chan enqueueResult, 1),
response: make(chan *frontendv2pb.QueryResultRequest, 1),
}
f.requests.put(freq)
defer f.requests.delete(freq.queryID)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.
enqueueAgain:
select {
case <-ctx.Done():
return nil, ctx.Err()
case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}
var cancelCh chan<- uint64
select {
case <-ctx.Done():
return nil, ctx.Err()
case enqRes := <-freq.enqueue:
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
} else if enqRes.status == failed {
retries--
if retries > 0 {
goto enqueueAgain
}
}
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}
select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, ignore.
}
}
return nil, ctx.Err()
case resp := <-freq.response:
if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
stats := stats.FromContext(ctx)
stats.Merge(resp.Stats) // Safe if stats is nil.
}
return resp.HttpResponse, nil
}
}
func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)
req := f.requests.get(qrReq.QueryID)
// It is possible that some old response belonging to different user was received, if frontend has restarted.
// To avoid leaking query results between users, we verify the user here.
// To avoid mixing results from different queries, we randomize queryID counter on start.
if req != nil && req.userID == userID {
select {
case req.response <- qrReq:
// Should always be possible, unless QueryResult is called multiple times with the same queryID.
default:
level.Warn(f.log).Log("msg", "failed to write query result to the response channel", "queryID", qrReq.QueryID, "user", userID)
}
}
return &frontendv2pb.QueryResultResponse{}, nil
}
// CheckReady determines if the query frontend is ready. Function parameters/return
// chosen to match the same method in the ingester
func (f *Frontend) CheckReady(_ context.Context) error {
workers := f.schedulerWorkers.getWorkersCount()
// If frontend is connected to at least one scheduler, we are ready.
if workers > 0 {
return nil
}
msg := fmt.Sprintf("not ready: number of schedulers this worker is connected to is %d", workers)
level.Info(f.log).Log("msg", msg)
return errors.New(msg)
}
type requestsInProgress struct {
mu sync.Mutex
requests map[uint64]*frontendRequest
}
func newRequestsInProgress() *requestsInProgress {
return &requestsInProgress{
requests: map[uint64]*frontendRequest{},
}
}
func (r *requestsInProgress) count() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.requests)
}
func (r *requestsInProgress) put(req *frontendRequest) {
r.mu.Lock()
defer r.mu.Unlock()
r.requests[req.queryID] = req
}
func (r *requestsInProgress) delete(queryID uint64) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.requests, queryID)
}
func (r *requestsInProgress) get(queryID uint64) *frontendRequest {
r.mu.Lock()
defer r.mu.Unlock()
return r.requests[queryID]
}

@ -0,0 +1,342 @@
package v2
import (
"context"
"net/http"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
lokiutil "github.com/grafana/loki/pkg/util"
)
type frontendSchedulerWorkers struct {
services.Service
cfg Config
logger log.Logger
frontendAddress string
// Channel with requests that should be forwarded to the scheduler.
requestsCh <-chan *frontendRequest
watcher services.Service
mu sync.Mutex
// Set to nil when stop is called... no more workers are created afterwards.
workers map[string]*frontendSchedulerWorker
}
func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.ReadRing, requestsCh <-chan *frontendRequest, logger log.Logger) (*frontendSchedulerWorkers, error) {
f := &frontendSchedulerWorkers{
cfg: cfg,
logger: logger,
frontendAddress: frontendAddress,
requestsCh: requestsCh,
workers: map[string]*frontendSchedulerWorker{},
}
switch {
case ring != nil:
// Use the scheduler ring and RingWatcher to find schedulers.
w, err := lokiutil.NewRingWatcher(log.With(logger, "component", "frontend-scheduler-worker"), ring, cfg.DNSLookupPeriod, f)
if err != nil {
return nil, err
}
f.watcher = w
default:
// If there is no ring config fallback on using DNS for the frontend scheduler worker to find the schedulers.
w, err := util.NewDNSWatcher(cfg.SchedulerAddress, cfg.DNSLookupPeriod, f)
if err != nil {
return nil, err
}
f.watcher = w
}
f.Service = services.NewIdleService(f.starting, f.stopping)
return f, nil
}
func (f *frontendSchedulerWorkers) starting(ctx context.Context) error {
return services.StartAndAwaitRunning(ctx, f.watcher)
}
func (f *frontendSchedulerWorkers) stopping(_ error) error {
err := services.StopAndAwaitTerminated(context.Background(), f.watcher)
f.mu.Lock()
defer f.mu.Unlock()
for _, w := range f.workers {
w.stop()
}
f.workers = nil
return err
}
func (f *frontendSchedulerWorkers) AddressAdded(address string) {
f.mu.Lock()
ws := f.workers
w := f.workers[address]
f.mu.Unlock()
// Already stopped or we already have worker for this address.
if ws == nil || w != nil {
return
}
level.Info(f.logger).Log("msg", "adding connection to scheduler", "addr", address)
conn, err := f.connectToScheduler(context.Background(), address)
if err != nil {
level.Error(f.logger).Log("msg", "error connecting to scheduler", "addr", address, "err", err)
return
}
// No worker for this address yet, start a new one.
w = newFrontendSchedulerWorker(conn, address, f.frontendAddress, f.requestsCh, f.cfg.WorkerConcurrency, f.logger)
f.mu.Lock()
defer f.mu.Unlock()
// Can be nil if stopping has been called already.
if f.workers != nil {
f.workers[address] = w
w.start()
}
}
func (f *frontendSchedulerWorkers) AddressRemoved(address string) {
level.Info(f.logger).Log("msg", "removing connection to scheduler", "addr", address)
f.mu.Lock()
// This works fine if f.workers is nil already.
w := f.workers[address]
delete(f.workers, address)
f.mu.Unlock()
if w != nil {
w.stop()
}
}
// Get number of workers.
func (f *frontendSchedulerWorkers) getWorkersCount() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.workers)
}
func (f *frontendSchedulerWorkers) connectToScheduler(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := f.cfg.GRPCClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
conn, err := grpc.DialContext(ctx, address, opts...)
if err != nil {
return nil, err
}
return conn, nil
}
// Worker managing single gRPC connection to Scheduler. Each worker starts multiple goroutines for forwarding
// requests and cancellations to scheduler.
type frontendSchedulerWorker struct {
log log.Logger
conn *grpc.ClientConn
concurrency int
schedulerAddr string
frontendAddr string
// Context and cancellation used by individual goroutines.
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Shared between all frontend workers.
requestCh <-chan *frontendRequest
// Cancellation requests for this scheduler are received via this channel. It is passed to frontend after
// query has been enqueued to scheduler.
cancelCh chan uint64
}
func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, frontendAddr string, requestCh <-chan *frontendRequest, concurrency int, log log.Logger) *frontendSchedulerWorker {
w := &frontendSchedulerWorker{
log: log,
conn: conn,
concurrency: concurrency,
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestCh: requestCh,
cancelCh: make(chan uint64),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
return w
}
func (w *frontendSchedulerWorker) start() {
client := schedulerpb.NewSchedulerForFrontendClient(w.conn)
for i := 0; i < w.concurrency; i++ {
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.runOne(w.ctx, client)
}()
}
}
func (w *frontendSchedulerWorker) stop() {
w.cancel()
w.wg.Wait()
if err := w.conn.Close(); err != nil {
level.Error(w.log).Log("msg", "error while closing connection to scheduler", "err", err)
}
}
func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb.SchedulerForFrontendClient) {
backoffConfig := backoff.Config{
MinBackoff: 500 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
loop, loopErr := client.FrontendLoop(ctx)
if loopErr != nil {
level.Error(w.log).Log("msg", "error contacting scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
}
loopErr = w.schedulerLoop(loop)
if closeErr := loop.CloseSend(); closeErr != nil {
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr)
}
if loopErr != nil {
level.Error(w.log).Log("msg", "error sending requests to scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
}
backoff.Reset()
}
}
func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) error {
if err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.INIT,
FrontendAddress: w.frontendAddr,
}); err != nil {
return err
}
if resp, err := loop.Recv(); err != nil || resp.Status != schedulerpb.OK {
if err != nil {
return err
}
return errors.Errorf("unexpected status received for init: %v", resp.Status)
}
ctx := loop.Context()
for {
select {
case <-ctx.Done():
// No need to report error if our internal context is canceled. This can happen during shutdown,
// or when scheduler is no longer resolvable. (It would be nice if this context reported "done" also when
// connection scheduler stops the call, but that doesn't seem to be the case).
//
// Reporting error here would delay reopening the stream (if the worker context is not done yet).
level.Debug(w.log).Log("msg", "stream context finished", "err", ctx.Err())
return nil
case req := <-w.requestCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
UserID: req.userID,
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
})
if err != nil {
req.enqueue <- enqueueResult{status: failed}
return err
}
resp, err := loop.Recv()
if err != nil {
req.enqueue <- enqueueResult{status: failed}
return err
}
switch resp.Status {
case schedulerpb.OK:
req.enqueue <- enqueueResult{status: waitForResponse, cancelCh: w.cancelCh}
// Response will come from querier.
case schedulerpb.SHUTTING_DOWN:
// Scheduler is shutting down, report failure to enqueue and stop this loop.
req.enqueue <- enqueueResult{status: failed}
return errors.New("scheduler is shutting down")
case schedulerpb.ERROR:
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
},
}
case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
req.enqueue <- enqueueResult{status: waitForResponse}
req.response <- &frontendv2pb.QueryResultRequest{
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusTooManyRequests,
Body: []byte("too many outstanding requests"),
},
}
}
case reqID := <-w.cancelCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: reqID,
})
if err != nil {
return err
}
resp, err := loop.Recv()
if err != nil {
return err
}
// Scheduler may be shutting down, report that.
if resp.Status != schedulerpb.OK {
return errors.Errorf("unexpected status received for cancellation: %v", resp.Status)
}
}
}
}

@ -0,0 +1,147 @@
package worker
import (
"context"
"fmt"
"net/http"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
)
var (
processorBackoffConfig = backoff.Config{
MinBackoff: 500 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
)
func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) processor {
return &frontendProcessor{
log: log,
handler: handler,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
}
}
// Handles incoming queries from frontend.
type frontendProcessor struct {
handler RequestHandler
maxMessageSize int
querierID string
log log.Logger
}
// notifyShutdown implements processor.
func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
client := frontendv1pb.NewFrontendClient(conn)
req := &frontendv1pb.NotifyClientShutdownRequest{ClientID: fp.querierID}
if _, err := client.NotifyClientShutdown(ctx, req); err != nil {
// Since we're shutting down there's nothing we can do except logging it.
level.Warn(fp.log).Log("msg", "failed to notify querier shutdown to query-frontend", "address", address, "err", err)
}
}
// runOne loops, trying to establish a stream to the frontend to begin request processing.
func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
client := frontendv1pb.NewFrontendClient(conn)
backoff := backoff.New(ctx, processorBackoffConfig)
for backoff.Ongoing() {
c, err := client.Process(ctx)
if err != nil {
level.Error(fp.log).Log("msg", "error contacting frontend", "address", address, "err", err)
backoff.Wait()
continue
}
if err := fp.process(c); err != nil {
level.Error(fp.log).Log("msg", "error processing requests", "address", address, "err", err)
backoff.Wait()
continue
}
backoff.Reset()
}
}
// process loops processing requests on an established stream.
func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()
for {
request, err := c.Recv()
if err != nil {
return err
}
switch request.Type {
case frontendv1pb.HTTP_REQUEST:
// Handle the request on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the query. We don't actually handle queries in parallel
// here, as we're running in lock step with the server - each Recv is
// paired with a Send.
go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error {
return c.Send(&frontendv1pb.ClientToFrontend{
HttpResponse: response,
Stats: stats,
})
})
case frontendv1pb.GET_ID:
err := c.Send(&frontendv1pb.ClientToFrontend{ClientID: fp.querierID})
if err != nil {
return err
}
default:
return fmt.Errorf("unknown request type: %v", request.Type)
}
}
}
func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.HTTPRequest, statsEnabled bool, sendHTTPResponse func(response *httpgrpc.HTTPResponse, stats *stats.Stats) error) {
var stats *querier_stats.Stats
if statsEnabled {
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
}
response, err := fp.handler.Handle(ctx, request)
if err != nil {
var ok bool
response, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
response = &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
}
}
}
// Ensure responses that are too big are not retried.
if len(response.Body) >= fp.maxMessageSize {
errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize)
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
level.Error(fp.log).Log("msg", "error processing query", "err", errMsg)
}
if err := sendHTTPResponse(response, stats); err != nil {
level.Error(fp.log).Log("msg", "error processing requests", "err", err)
}
}

@ -0,0 +1,86 @@
package worker
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"google.golang.org/grpc"
)
const (
notifyShutdownTimeout = 5 * time.Second
)
// Manages processor goroutines for single grpc connection.
type processorManager struct {
p processor
conn *grpc.ClientConn
address string
// Main context to control all goroutines.
ctx context.Context
wg sync.WaitGroup
// Cancel functions for individual goroutines.
cancelsMu sync.Mutex
cancels []context.CancelFunc
currentProcessors *atomic.Int32
}
func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn, address string) *processorManager {
return &processorManager{
p: p,
ctx: ctx,
conn: conn,
address: address,
currentProcessors: atomic.NewInt32(0),
}
}
func (pm *processorManager) stop() {
// Notify the remote query-frontend or query-scheduler we're shutting down.
// We use a new context to make sure it's not cancelled.
notifyCtx, cancel := context.WithTimeout(context.Background(), notifyShutdownTimeout)
defer cancel()
pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address)
// Stop all goroutines.
pm.concurrency(0)
// Wait until they finish.
pm.wg.Wait()
_ = pm.conn.Close()
}
func (pm *processorManager) concurrency(n int) {
pm.cancelsMu.Lock()
defer pm.cancelsMu.Unlock()
if n < 0 {
n = 0
}
for len(pm.cancels) < n {
ctx, cancel := context.WithCancel(pm.ctx)
pm.cancels = append(pm.cancels, cancel)
pm.wg.Add(1)
go func() {
defer pm.wg.Done()
pm.currentProcessors.Inc()
defer pm.currentProcessors.Dec()
pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address)
}()
}
for len(pm.cancels) > n {
pm.cancels[0]()
pm.cancels = pm.cancels[1:]
}
}

@ -0,0 +1,234 @@
package worker
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
dskit_middleware "github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)
func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) {
p := &schedulerProcessor{
log: log,
handler: handler,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_querier_query_frontend_request_duration_seconds",
Help: "Time spend doing requests to frontend.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"}),
}
frontendClientsGauge := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_querier_query_frontend_clients",
Help: "The current number of clients connected to query-frontend.",
})
poolConfig := client.PoolConfig{
CheckInterval: 5 * time.Second,
HealthCheckEnabled: true,
HealthCheckTimeout: 1 * time.Second,
}
p.frontendPool = client.NewPool("frontend", poolConfig, nil, p.createFrontendClient, frontendClientsGauge, log)
return p, []services.Service{p.frontendPool}
}
// Handles incoming queries from query-scheduler.
type schedulerProcessor struct {
log log.Logger
handler RequestHandler
grpcConfig grpcclient.Config
maxMessageSize int
querierID string
frontendPool *client.Pool
frontendClientRequestDuration *prometheus.HistogramVec
}
// notifyShutdown implements processor.
func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string) {
client := schedulerpb.NewSchedulerForQuerierClient(conn)
req := &schedulerpb.NotifyQuerierShutdownRequest{QuerierID: sp.querierID}
if _, err := client.NotifyQuerierShutdown(ctx, req); err != nil {
// Since we're shutting down there's nothing we can do except logging it.
level.Warn(sp.log).Log("msg", "failed to notify querier shutdown to query-scheduler", "address", address, "err", err)
}
}
func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn)
backoff := backoff.New(ctx, processorBackoffConfig)
for backoff.Ongoing() {
c, err := schedulerClient.QuerierLoop(ctx)
if err == nil {
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
}
if err != nil {
level.Error(sp.log).Log("msg", "error contacting scheduler", "err", err, "addr", address)
backoff.Wait()
continue
}
if err := sp.querierLoop(c, address); err != nil {
// E.Welch I don't know how to do this any better but context cancelations seem common,
// likely because of an underlying connection being close,
// they are noisy and I don't think they communicate anything useful.
if !strings.Contains(err.Error(), "context canceled") {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
}
backoff.Wait()
continue
}
backoff.Reset()
}
}
// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()
for {
request, err := c.Recv()
if err != nil {
return err
}
// Handle the request on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the query. We don't actually handle queries in parallel
// here, as we're running in lock step with the server - each Recv is
// paired with a Send.
go func() {
// We need to inject user into context for sending response back.
ctx := user.InjectOrgID(ctx, request.UserID)
tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := lokigrpc.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()
ctx = spanCtx
}
logger := util_log.WithContext(ctx, sp.log)
sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
// Report back to scheduler that processing of the query has finished.
if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil {
level.Error(logger).Log("msg", "error notifying scheduler about finished query", "err", err, "addr", address)
}
}()
}
}
func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
var stats *querier_stats.Stats
if statsEnabled {
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
}
response, err := sp.handler.Handle(ctx, request)
if err != nil {
var ok bool
response, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
response = &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
}
}
}
// Ensure responses that are too big are not retried.
if len(response.Body) >= sp.maxMessageSize {
level.Error(logger).Log("msg", "response larger than max message size", "size", len(response.Body), "maxMessageSize", sp.maxMessageSize)
errMsg := fmt.Sprintf("response larger than the max message size (%d vs %d)", len(response.Body), sp.maxMessageSize)
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
}
c, err := sp.frontendPool.GetClientFor(frontendAddress)
if err == nil {
// Response is empty and uninteresting.
_, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
HttpResponse: response,
Stats: stats,
})
}
if err != nil {
level.Error(logger).Log("msg", "error notifying frontend about finished query", "err", err, "frontend", frontendAddress)
}
}
func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClient, error) {
opts, err := sp.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
dskit_middleware.PrometheusGRPCUnaryInstrumentation(sp.frontendClientRequestDuration),
}, nil)
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return &frontendClient{
FrontendForQuerierClient: frontendv2pb.NewFrontendForQuerierClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}
type frontendClient struct {
frontendv2pb.FrontendForQuerierClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}
func (fc *frontendClient) Close() error {
return fc.conn.Close()
}

@ -0,0 +1,284 @@
package worker
import (
"context"
"flag"
"os"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
lokiutil "github.com/grafana/loki/pkg/util"
)
type Config struct {
FrontendAddress string `yaml:"frontend_address"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration"`
Parallelism int `yaml:"parallelism"`
MatchMaxConcurrency bool `yaml:"match_max_concurrent"`
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine.
QuerierID string `yaml:"id"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "querier.scheduler-address", "", "Hostname (and port) of scheduler that querier will periodically resolve, connect to and receive queries from. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.StringVar(&cfg.FrontendAddress, "querier.frontend-address", "", "Address of query frontend service, in host:port format. If -querier.scheduler-address is set as well, querier will use scheduler instead. Only one of -querier.frontend-address or -querier.scheduler-address can be set. If neither is set, queries are only received via HTTP endpoint.")
f.DurationVar(&cfg.DNSLookupPeriod, "querier.dns-lookup-period", 3*time.Second, "How often to query DNS for query-frontend or query-scheduler address. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query-frontend or query-scheduler.")
f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.")
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}
func (cfg *Config) Validate(log log.Logger) error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate(log)
}
// Handler for HTTP requests wrapped in protobuf messages.
type RequestHandler interface {
Handle(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}
// Single processor handles all streaming operations to query-frontend or query-scheduler to fetch queries
// and process them.
type processor interface {
// Each invocation of processQueriesOnSingleStream starts new streaming operation to query-frontend
// or query-scheduler to fetch queries and execute them.
//
// This method must react on context being finished, and stop when that happens.
//
// processorManager (not processor) is responsible for starting as many goroutines as needed for each connection.
processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string)
// notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is
// shutting down.
notifyShutdown(ctx context.Context, conn *grpc.ClientConn, address string)
}
type querierWorker struct {
*services.BasicService
cfg Config
logger log.Logger
processor processor
subservices *services.Manager
mu sync.Mutex
// Set to nil when stop is called... no more managers are created afterwards.
managers map[string]*processorManager
}
func NewQuerierWorker(cfg Config, rng ring.ReadRing, handler RequestHandler, logger log.Logger, reg prometheus.Registerer) (services.Service, error) {
if cfg.QuerierID == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, errors.Wrap(err, "failed to get hostname for configuring querier ID")
}
cfg.QuerierID = hostname
}
var processor processor
var servs []services.Service
var address string
switch {
case rng != nil:
level.Info(logger).Log("msg", "Starting querier worker using query-scheduler and scheduler ring for addresses")
processor, servs = newSchedulerProcessor(cfg, handler, logger, reg)
case cfg.SchedulerAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-scheduler", "scheduler", cfg.SchedulerAddress)
address = cfg.SchedulerAddress
processor, servs = newSchedulerProcessor(cfg, handler, logger, reg)
case cfg.FrontendAddress != "":
level.Info(logger).Log("msg", "Starting querier worker connected to query-frontend", "frontend", cfg.FrontendAddress)
address = cfg.FrontendAddress
processor = newFrontendProcessor(cfg, handler, logger)
default:
return nil, errors.New("unable to start the querier worker, need to configure one of frontend_address, scheduler_address, or a ring config in the query_scheduler config block")
}
return newQuerierWorkerWithProcessor(cfg, logger, processor, address, rng, servs)
}
func newQuerierWorkerWithProcessor(cfg Config, logger log.Logger, processor processor, address string, ring ring.ReadRing, servs []services.Service) (*querierWorker, error) {
f := &querierWorker{
cfg: cfg,
logger: logger,
managers: map[string]*processorManager{},
processor: processor,
}
// Empty address is only used in tests, where individual targets are added manually.
if address != "" {
w, err := util.NewDNSWatcher(address, cfg.DNSLookupPeriod, f)
if err != nil {
return nil, err
}
servs = append(servs, w)
}
if ring != nil {
w, err := lokiutil.NewRingWatcher(log.With(logger, "component", "querier-scheduler-worker"), ring, cfg.DNSLookupPeriod, f)
if err != nil {
return nil, err
}
servs = append(servs, w)
}
if len(servs) > 0 {
subservices, err := services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "querier worker subservices")
}
f.subservices = subservices
}
f.BasicService = services.NewIdleService(f.starting, f.stopping)
return f, nil
}
func (w *querierWorker) starting(ctx context.Context) error {
if w.subservices == nil {
return nil
}
return services.StartManagerAndAwaitHealthy(ctx, w.subservices)
}
func (w *querierWorker) stopping(_ error) error {
// Stop all goroutines fetching queries. Note that in Stopping state,
// worker no longer creates new managers in AddressAdded method.
w.mu.Lock()
for _, m := range w.managers {
m.stop()
}
w.mu.Unlock()
if w.subservices == nil {
return nil
}
// Stop DNS watcher and services used by processor.
return services.StopManagerAndAwaitStopped(context.Background(), w.subservices)
}
func (w *querierWorker) AddressAdded(address string) {
ctx := w.ServiceContext()
if ctx == nil || ctx.Err() != nil {
return
}
w.mu.Lock()
defer w.mu.Unlock()
if m := w.managers[address]; m != nil {
return
}
level.Info(w.logger).Log("msg", "adding connection", "addr", address)
conn, err := w.connect(context.Background(), address)
if err != nil {
level.Error(w.logger).Log("msg", "error connecting", "addr", address, "err", err)
return
}
w.managers[address] = newProcessorManager(ctx, w.processor, conn, address)
// Called with lock.
w.resetConcurrency()
}
func (w *querierWorker) AddressRemoved(address string) {
level.Info(w.logger).Log("msg", "removing connection", "addr", address)
w.mu.Lock()
p := w.managers[address]
delete(w.managers, address)
// Called with lock.
w.resetConcurrency()
w.mu.Unlock()
if p != nil {
p.stop()
}
}
// Must be called with lock.
func (w *querierWorker) resetConcurrency() {
totalConcurrency := 0
index := 0
for _, m := range w.managers {
concurrency := 0
if w.cfg.MatchMaxConcurrency {
concurrency = w.cfg.MaxConcurrentRequests / len(w.managers)
// If max concurrency does not evenly divide into our frontends a subset will be chosen
// to receive an extra connection. Frontend addresses were shuffled above so this will be a
// random selection of frontends.
if index < w.cfg.MaxConcurrentRequests%len(w.managers) {
level.Warn(w.logger).Log("msg", "max concurrency is not evenly divisible across targets, adding an extra connection", "addr", m.address)
concurrency++
}
} else {
concurrency = w.cfg.Parallelism
}
// If concurrency is 0 then MaxConcurrentRequests is less than the total number of
// frontends/schedulers. In order to prevent accidentally starving a frontend or scheduler we are just going to
// always connect once to every target. This is dangerous b/c we may start exceeding PromQL
// max concurrency.
if concurrency == 0 {
concurrency = 1
}
totalConcurrency += concurrency
m.concurrency(concurrency)
index++
}
if totalConcurrency > w.cfg.MaxConcurrentRequests {
level.Warn(w.logger).Log("msg", "total worker concurrency is greater than promql max concurrency. Queries may be queued in the querier which reduces QOS")
}
}
func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := w.cfg.GRPCClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
conn, err := grpc.DialContext(ctx, address, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

@ -4,7 +4,7 @@ import (
"fmt"
"net/http"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
@ -15,6 +15,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
serverutil "github.com/grafana/loki/pkg/util/server"
)
@ -25,6 +26,7 @@ type WorkerServiceConfig struct {
QuerierWorkerConfig *querier_worker.Config
QueryFrontendEnabled bool
QuerySchedulerEnabled bool
SchedulerRing ring.ReadRing
}
// InitWorkerService takes a config object, a map of routes to handlers, an external http router and external
@ -78,12 +80,16 @@ func InitWorkerService(
// If a frontend or scheduler address has been configured, return a querier worker service that uses
// the external Loki Server HTTP server, which has now has the internal handler's routes registered with it
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(externalHandler), util_log.Logger, prometheus.DefaultRegisterer)
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
httpgrpc_server.NewServer(externalHandler),
util_log.Logger,
prometheus.DefaultRegisterer)
}
// Since we must be running a querier with either a frontend and/or scheduler at this point, if no frontend or scheduler address
// Since we must be running a querier with either a frontend and/or scheduler at this point, if no scheduler ring, frontend, or scheduler address
// is configured, Loki will default to using the frontend on localhost on it's own GRPC listening port.
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
address := fmt.Sprintf("127.0.0.1:%d", cfg.GrpcListenPort)
level.Warn(util_log.Logger).Log(
"msg", "Worker address is empty, attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.",
@ -117,7 +123,11 @@ func InitWorkerService(
//Return a querier worker pointed to the internal querier HTTP handler so there is not a conflict in routes between the querier
//and the query frontend
return querier_worker.NewQuerierWorker(
*(cfg.QuerierWorkerConfig), httpgrpc_server.NewServer(internalHandler), util_log.Logger, prometheus.DefaultRegisterer)
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
httpgrpc_server.NewServer(internalHandler),
util_log.Logger,
prometheus.DefaultRegisterer)
}
func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {

@ -5,12 +5,13 @@ import (
"net/http/httptest"
"testing"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/gorilla/mux"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
)
func Test_InitQuerierService(t *testing.T) {

@ -0,0 +1,673 @@
package scheduler
import (
"context"
"flag"
"io"
"net/http"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)
var (
errSchedulerIsNotRunning = errors.New("scheduler is not running")
)
const (
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10
)
// Scheduler is responsible for queueing and dispatching queries to Queriers.
type Scheduler struct {
services.Service
cfg Config
log log.Logger
limits Limits
connectedFrontendsMu sync.Mutex
connectedFrontends map[string]*connectedFrontend
requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
pendingRequestsMu sync.Mutex
pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time.
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
// Metrics.
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
connectedQuerierClients prometheus.GaugeFunc
connectedFrontendClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
// Ring used for finding schedulers
ringLifecycler *ring.BasicLifecycler
ring *ring.Ring
}
type requestKey struct {
frontendAddr string
queryID uint64
}
type connectedFrontend struct {
connections int
// This context is used for running all queries from the same frontend.
// When last frontend connection is closed, context is canceled.
ctx context.Context
cancel context.CancelFunc
}
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"`
QuerierForgetDelay time.Duration `yaml:"-"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
// Schedulers ring
UseSchedulerRing bool `yaml:"use_scheduler_ring"`
SchedulerRing RingConfig `yaml:"scheduler_ring,omitempty"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
// Loki doesn't have query shuffle sharding yet for which this config is intended
// use the default value of 0 until someday when this config may be needed.
cfg.QuerierForgetDelay = 0
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query scheduler create a ring and the frontend and frontend_worker use this ring to get the addresses of the query schedulers. If frontend_address and scheduler_address are not present in the config this value will be toggle by Loki to true")
cfg.SchedulerRing.RegisterFlags(f)
}
// NewScheduler creates a new Scheduler.
func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) {
s := &Scheduler{
cfg: cfg,
log: log,
limits: limits,
pendingRequests: map[requestKey]*schedulerRequest{},
connectedFrontends: map[string]*connectedFrontend{},
}
s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"})
s.discardedRequests = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests)
s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",
Help: "Time spend by requests in queue before getting picked up by a querier.",
Buckets: prometheus.DefBuckets,
})
s.connectedQuerierClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_connected_querier_clients",
Help: "Number of querier worker clients currently connected to the query-scheduler.",
}, s.requestQueue.GetConnectedQuerierWorkersMetric)
s.connectedFrontendClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_connected_frontend_clients",
Help: "Number of query-frontend worker clients currently connected to the query-scheduler.",
}, s.getConnectedFrontendClientsMetric)
s.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(s.cleanupMetricsForInactiveUser)
svcs := []services.Service{s.requestQueue, s.activeUsers}
if cfg.UseSchedulerRing {
ringStore, err := kv.NewClient(
cfg.SchedulerRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"),
log,
)
if err != nil {
return nil, errors.Wrap(err, "create KV store client")
}
lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig()
if err != nil {
return nil, errors.Wrap(err, "invalid ring lifecycler config")
}
// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(s)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log)
delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log)
s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, RingNameForServer, RingKey, ringStore, delegate, log, registerer)
if err != nil {
return nil, errors.Wrap(err, "create ring lifecycler")
}
ringCfg := cfg.SchedulerRing.ToRingConfig()
s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, RingNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy())
if err != nil {
return nil, errors.Wrap(err, "create ring client")
}
if registerer != nil {
registerer.MustRegister(s.ring)
}
svcs = append(svcs, s.ringLifecycler, s.ring)
}
var err error
s.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, err
}
s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
return s, nil
}
// Limits needed for the Query Scheduler - interface used for decoupling.
type Limits interface {
// MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
MaxQueriersPerUser(user string) int
}
type schedulerRequest struct {
frontendAddress string
userID string
queryID uint64
request *httpgrpc.HTTPRequest
statsEnabled bool
enqueueTime time.Time
ctx context.Context
ctxCancel context.CancelFunc
queueSpan opentracing.Span
// This is only used for testing.
parentSpanContext opentracing.SpanContext
}
// FrontendLoop handles connection from frontend.
func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
frontendAddress, frontendCtx, err := s.frontendConnected(frontend)
if err != nil {
return err
}
defer s.frontendDisconnected(frontendAddress)
// Response to INIT. If scheduler is not running, we skip for-loop, send SHUTTING_DOWN and exit this method.
if s.State() == services.Running {
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
}
}
// We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns
// cancels all their queries.
for s.State() == services.Running {
msg, err := frontend.Recv()
if err != nil {
// No need to report this as error, it is expected when query-frontend performs SendClose() (as frontendSchedulerWorker does).
if err == io.EOF {
return nil
}
return err
}
if s.State() != services.Running {
break // break out of the loop, and send SHUTTING_DOWN message.
}
var resp *schedulerpb.SchedulerToFrontend
switch msg.GetType() {
case schedulerpb.ENQUEUE:
err = s.enqueueRequest(frontendCtx, frontendAddress, msg)
switch {
case err == nil:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
case err == queue.ErrTooManyRequests:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
default:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR, Error: err.Error()}
}
case schedulerpb.CANCEL:
s.cancelRequestAndRemoveFromPending(frontendAddress, msg.QueryID)
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
default:
level.Error(s.log).Log("msg", "unknown request type from frontend", "addr", frontendAddress, "type", msg.GetType())
return errors.New("unknown request type")
}
err = frontend.Send(resp)
// Failure to send response results in ending this connection.
if err != nil {
return err
}
}
// Report shutdown back to frontend, so that it can retry with different scheduler. Also stop the frontend loop.
return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN})
}
func (s *Scheduler) frontendConnected(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) (string, context.Context, error) {
msg, err := frontend.Recv()
if err != nil {
return "", nil, err
}
if msg.Type != schedulerpb.INIT || msg.FrontendAddress == "" {
return "", nil, errors.New("no frontend address")
}
level.Debug(s.log).Log("msg", "frontend connected", "address", msg.FrontendAddress)
s.connectedFrontendsMu.Lock()
defer s.connectedFrontendsMu.Unlock()
cf := s.connectedFrontends[msg.FrontendAddress]
if cf == nil {
cf = &connectedFrontend{
connections: 0,
}
cf.ctx, cf.cancel = context.WithCancel(context.Background())
s.connectedFrontends[msg.FrontendAddress] = cf
}
cf.connections++
return msg.FrontendAddress, cf.ctx, nil
}
func (s *Scheduler) frontendDisconnected(frontendAddress string) {
s.connectedFrontendsMu.Lock()
defer s.connectedFrontendsMu.Unlock()
level.Debug(s.log).Log("msg", "frontend disconnected", "address", frontendAddress)
cf := s.connectedFrontends[frontendAddress]
cf.connections--
if cf.connections == 0 {
delete(s.connectedFrontends, frontendAddress)
cf.cancel()
}
}
func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr string, msg *schedulerpb.FrontendToScheduler) error {
// Create new context for this request, to support cancellation.
ctx, cancel := context.WithCancel(frontendContext)
shouldCancel := true
defer func() {
if shouldCancel {
cancel()
}
}()
// Extract tracing information from headers in HTTP request. FrontendContext doesn't have the correct tracing
// information, since that is a long-running request.
tracer := opentracing.GlobalTracer()
parentSpanContext, err := lokigrpc.GetParentSpanForRequest(tracer, msg.HttpRequest)
if err != nil {
return err
}
userID := msg.GetUserID()
req := &schedulerRequest{
frontendAddress: frontendAddr,
userID: msg.UserID,
queryID: msg.QueryID,
request: msg.HttpRequest,
statsEnabled: msg.StatsEnabled,
}
now := time.Now()
req.parentSpanContext = parentSpanContext
req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext))
req.enqueueTime = now
req.ctxCancel = cancel
// aggregate the max queriers limit in the case of a multi tenant query
tenantIDs, err := tenant.TenantIDsFromOrgID(userID)
if err != nil {
return err
}
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
s.activeUsers.UpdateUserTimestamp(userID, now)
return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() {
shouldCancel = false
s.pendingRequestsMu.Lock()
defer s.pendingRequestsMu.Unlock()
s.pendingRequests[requestKey{frontendAddr: frontendAddr, queryID: msg.QueryID}] = req
})
}
// This method doesn't do removal from the queue.
func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, queryID uint64) {
s.pendingRequestsMu.Lock()
defer s.pendingRequestsMu.Unlock()
key := requestKey{frontendAddr: frontendAddr, queryID: queryID}
req := s.pendingRequests[key]
if req != nil {
req.ctxCancel()
}
delete(s.pendingRequests, key)
}
// QuerierLoop is started by querier to receive queries from scheduler.
func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer) error {
resp, err := querier.Recv()
if err != nil {
return err
}
querierID := resp.GetQuerierID()
level.Debug(s.log).Log("msg", "querier connected", "querier", querierID)
s.requestQueue.RegisterQuerierConnection(querierID)
defer s.requestQueue.UnregisterQuerierConnection(querierID)
// If the downstream connection to querier is cancelled,
// we need to ping the condition variable to unblock getNextRequestForQuerier.
// Ideally we'd have ctx aware condition variables...
go func() {
<-querier.Context().Done()
s.requestQueue.QuerierDisconnecting()
}()
lastUserIndex := queue.FirstUser()
// In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues.
for s.isRunningOrStopping() {
req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID)
if err != nil {
return err
}
lastUserIndex = idx
r := req.(*schedulerRequest)
s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds())
r.queueSpan.Finish()
/*
We want to dequeue the next unexpired request from the chosen tenant queue.
The chance of choosing a particular tenant for dequeueing is (1/active_tenants).
This is problematic under load, especially with other middleware enabled such as
querier.split-by-interval, where one request may fan out into many.
If expired requests aren't exhausted before checking another tenant, it would take
n_active_tenants * n_expired_requests_at_front_of_queue requests being processed
before an active request was handled for the tenant in question.
If this tenant meanwhile continued to queue requests,
it's possible that it's own queue would perpetually contain only expired requests.
*/
if r.ctx.Err() != nil {
// Remove from pending requests.
s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID)
lastUserIndex = lastUserIndex.ReuseLastUser()
continue
}
if err := s.forwardRequestToQuerier(querier, r); err != nil {
return err
}
}
return errSchedulerIsNotRunning
}
func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.NotifyQuerierShutdownRequest) (*schedulerpb.NotifyQuerierShutdownResponse, error) {
level.Debug(s.log).Log("msg", "received shutdown notification from querier", "querier", req.GetQuerierID())
s.requestQueue.NotifyQuerierShutdown(req.GetQuerierID())
return &schedulerpb.NotifyQuerierShutdownResponse{}, nil
}
func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error {
// Make sure to cancel request at the end to cleanup resources.
defer s.cancelRequestAndRemoveFromPending(req.frontendAddress, req.queryID)
// Handle the stream sending & receiving on a goroutine so we can
// monitoring the contexts in a select and cancel things appropriately.
errCh := make(chan error, 1)
go func() {
err := querier.Send(&schedulerpb.SchedulerToQuerier{
UserID: req.userID,
QueryID: req.queryID,
FrontendAddress: req.frontendAddress,
HttpRequest: req.request,
StatsEnabled: req.statsEnabled,
})
if err != nil {
errCh <- err
return
}
_, err = querier.Recv()
errCh <- err
}()
select {
case <-req.ctx.Done():
// If the upstream request is cancelled (eg. frontend issued CANCEL or closed connection),
// we need to cancel the downstream req. Only way we can do that is to close the stream (by returning error here).
// Querier is expecting this semantics.
return req.ctx.Err()
case err := <-errCh:
// Is there was an error handling this request due to network IO,
// then error out this upstream request _and_ stream.
if err != nil {
s.forwardErrorToFrontend(req.ctx, req, err)
}
return err
}
}
func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRequest, requestErr error) {
opts, err := s.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor},
nil)
if err != nil {
level.Warn(s.log).Log("msg", "failed to create gRPC options for the connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr)
return
}
conn, err := grpc.DialContext(ctx, req.frontendAddress, opts...)
if err != nil {
level.Warn(s.log).Log("msg", "failed to create gRPC connection to frontend to report error", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr)
return
}
defer func() {
_ = conn.Close()
}()
client := frontendv2pb.NewFrontendForQuerierClient(conn)
userCtx := user.InjectOrgID(ctx, req.userID)
_, err = client.QueryResult(userCtx, &frontendv2pb.QueryResultRequest{
QueryID: req.queryID,
HttpResponse: &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(requestErr.Error()),
},
})
if err != nil {
level.Warn(s.log).Log("msg", "failed to forward error to frontend", "frontend", req.frontendAddress, "err", err, "requestErr", requestErr)
return
}
}
func (s *Scheduler) isRunningOrStopping() bool {
st := s.State()
return st == services.Running || st == services.Stopping
}
func (s *Scheduler) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || s.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), s.subservices); stopErr != nil {
level.Error(s.log).Log("msg", "failed to gracefully stop scheduler dependencies", "err", stopErr)
}
}()
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return errors.Wrap(err, "unable to start scheduler subservices")
}
if s.cfg.UseSchedulerRing {
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
// someone wants to do can be done before becoming ACTIVE. For the query scheduler we don't currently
// have any additional work so we can become ACTIVE right away.
// Wait until the ring client detected this instance in the JOINING state to
// make sure that when we'll run the initial sync we already know the tokens
// assigned to this instance.
level.Info(s.log).Log("msg", "waiting until scheduler is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(s.log).Log("msg", "scheduler is JOINING in the ring")
// Change ring state to ACTIVE
if err = s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
}
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(s.log).Log("msg", "waiting until scheduler is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(s.log).Log("msg", "scheduler is ACTIVE in the ring")
}
return nil
}
func (s *Scheduler) running(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-s.subservicesWatcher.Chan():
return errors.Wrap(err, "scheduler subservice failed")
}
}
}
// Close the Scheduler.
func (s *Scheduler) stopping(_ error) error {
// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
return services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
}
func (s *Scheduler) cleanupMetricsForInactiveUser(user string) {
s.queueLength.DeleteLabelValues(user)
s.discardedRequests.DeleteLabelValues(user)
}
func (s *Scheduler) getConnectedFrontendClientsMetric() float64 {
s.connectedFrontendsMu.Lock()
defer s.connectedFrontendsMu.Unlock()
count := 0
for _, workers := range s.connectedFrontends {
count += workers.connections
}
return float64(count)
}
// SafeReadRing does a nil check on the Scheduler before attempting to return it's ring
// this is necessary as many callers of this function will only have a valid Scheduler
// reference if the QueryScheduler target has been specified, which is not guaranteed
func (s *Scheduler) SafeReadRing() ring.ReadRing {
if s == nil || s.ring == nil || !s.cfg.UseSchedulerRing {
return nil
}
return s.ring
}
func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the scheduler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(RingNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.JOINING, tokens
}
func (s *Scheduler) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
func (s *Scheduler) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
func (s *Scheduler) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}
func (s *Scheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.ring.ServeHTTP(w, req)
}

@ -0,0 +1,110 @@
package scheduler
import (
"flag"
"fmt"
"os"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/cortexproject/cortex/pkg/ring"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
const (
// RingKey is the key under which we store the store gateways ring in the KVStore.
RingKey = "scheduler"
// RingNameForServer is the name of the ring used by the store gateway server.
RingNameForServer = "scheduler"
// RingNameForClient is the name of the ring used by the store gateway client (we need
// a different name to avoid clashing Prometheus metrics when running in single-binary).
RingNameForClient = "scheduler-client"
// We use a safe default instead of exposing to config option to the user
// in order to simplify the config.
RingNumTokens = 512
)
// RingConfig masks the ring lifecycler config which contains
// many options not really required by the distributors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
TokensFilePath string `yaml:"tokens_file_path"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
// Instance details
InstanceID string `yaml:"instance_id"`
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
InstancePort int `yaml:"instance_port"`
InstanceAddr string `yaml:"instance_addr"`
InstanceZone string `yaml:"instance_availability_zone"`
// Injected internally
ListenPort int `yaml:"-"`
ObservePeriod time.Duration `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
hostname, err := os.Hostname()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix("scheduler.ring.", "schedulers/", f)
f.DurationVar(&cfg.HeartbeatPeriod, "scheduler.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, "scheduler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which schedulers are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.StringVar(&cfg.TokensFilePath, "scheduler.ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, "scheduler.ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.")
// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "scheduler.ring.instance-interface-names", "Name of network interface to read address from.")
f.StringVar(&cfg.InstanceAddr, "scheduler.ring.instance-addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.InstancePort, "scheduler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.InstanceID, "scheduler.ring.instance-id", hostname, "Instance ID to register in the ring.")
f.StringVar(&cfg.InstanceZone, "scheduler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.")
}
// ToLifecyclerConfig returns a LifecyclerConfig based on the scheduler ring config.
func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames)
if err != nil {
return ring.BasicLifecyclerConfig{}, err
}
instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort)
return ring.BasicLifecyclerConfig{
ID: cfg.InstanceID,
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
Zone: cfg.InstanceZone,
HeartbeatPeriod: cfg.HeartbeatPeriod,
TokensObservePeriod: 0,
NumTokens: RingNumTokens,
}, nil
}
func (cfg *RingConfig) ToRingConfig() ring.Config {
rc := ring.Config{}
flagext.DefaultValues(&rc)
rc.KVStore = cfg.KVStore
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled
rc.ReplicationFactor = 2
return rc
}

@ -0,0 +1,40 @@
package httpgrpc
import (
"github.com/opentracing/opentracing-go"
weaveworks_httpgrpc "github.com/weaveworks/common/httpgrpc"
)
// Used to transfer trace information from/to HTTP request.
type HeadersCarrier weaveworks_httpgrpc.HTTPRequest
func (c *HeadersCarrier) Set(key, val string) {
c.Headers = append(c.Headers, &weaveworks_httpgrpc.Header{
Key: key,
Values: []string{val},
})
}
func (c *HeadersCarrier) ForeachKey(handler func(key, val string) error) error {
for _, h := range c.Headers {
for _, v := range h.Values {
if err := handler(h.Key, v); err != nil {
return err
}
}
}
return nil
}
func GetParentSpanForRequest(tracer opentracing.Tracer, req *weaveworks_httpgrpc.HTTPRequest) (opentracing.SpanContext, error) {
if tracer == nil {
return nil, nil
}
carrier := (*HeadersCarrier)(req)
extracted, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
if err == opentracing.ErrSpanContextNotFound {
err = nil
}
return extracted, err
}

@ -0,0 +1,126 @@
package util
import (
"context"
"fmt"
"time"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
)
const (
RingKeyOfLeader = 0
)
type ringWatcher struct {
log log.Logger
ring ring.ReadRing
notifications util.DNSNotifications
lookupPeriod time.Duration
addresses []string
}
// NewRingWatcher creates a new Ring watcher and returns a service that is wrapping it.
func NewRingWatcher(log log.Logger, ring ring.ReadRing, lookupPeriod time.Duration, notifications util.DNSNotifications) (services.Service, error) {
w := &ringWatcher{
log: log,
ring: ring,
notifications: notifications,
lookupPeriod: lookupPeriod,
}
return services.NewBasicService(nil, w.watchLoop, nil), nil
}
// watchLoop watches for changes in DNS and sends notifications.
func (w *ringWatcher) watchLoop(servCtx context.Context) error {
syncTicker := time.NewTicker(w.lookupPeriod)
defer syncTicker.Stop()
for {
select {
case <-servCtx.Done():
return nil
case <-syncTicker.C:
w.lookupAddresses()
}
}
}
func (w *ringWatcher) lookupAddresses() {
addrs, err := w.getAddresses()
if err != nil {
level.Error(w.log).Log("msg", "error getting addresses from ring", "err", err)
}
if len(addrs) == 0 {
return
}
toAdd := make([]string, 0, len(addrs))
for i, newAddr := range addrs {
alreadyExists := false
for _, currAddr := range w.addresses {
if currAddr == newAddr {
alreadyExists = true
}
}
if !alreadyExists {
toAdd = append(toAdd, addrs[i])
}
}
toRemove := make([]string, 0, len(w.addresses))
for i, existingAddr := range w.addresses {
stillExists := false
for _, newAddr := range addrs {
if newAddr == existingAddr {
stillExists = true
}
}
if !stillExists {
toRemove = append(toRemove, w.addresses[i])
}
}
for _, ta := range toAdd {
level.Debug(w.log).Log("msg", fmt.Sprintf("adding connection to scheduler at address: %s", ta))
w.notifications.AddressAdded(ta)
}
for _, tr := range toRemove {
level.Debug(w.log).Log("msg", fmt.Sprintf("removing connection to scheduler at address: %s", tr))
w.notifications.AddressRemoved(tr)
}
w.addresses = addrs
}
func (w *ringWatcher) getAddresses() ([]string, error) {
var addrs []string
// If there are less than 2 existing addresses, odds are we are running just a single instance
// so just get the first healthy address and use it. If the call returns to continue on to
// check for the actual replicaset instances
if len(w.addresses) < 2 {
rs, err := w.ring.GetAllHealthy(ring.WriteNoExtend)
if err != nil {
return nil, err
}
addrs = rs.GetAddresses()
if len(addrs) == 1 {
return addrs, nil
}
}
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, err
}
return rs.GetAddresses(), nil
}
Loading…
Cancel
Save