From 0a5e149ea540d9b034ff7023ca6f95ce09805080 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 6 Jun 2023 12:06:50 +0530 Subject: [PATCH] query-scheduler: fix query distribution in SSD mode (#9471) **What this PR does / why we need it**: When we run the `query-scheduler` in `ring` mode, `queriers` and `query-frontend` discover the available `query-scheduler` instances using the ring. However, we have a problem when `query-schedulers` are not running in the same process as queriers and query-frontend since [we try to get the ring client interface from the scheduler instance](https://github.com/grafana/loki/blob/abd6131bba18db7f3575241c5e6dc4eed879fbc0/pkg/loki/modules.go#L358). This causes queries not to be spread across all the available queriers when running in SSD mode because [we point querier workers to query frontend when there is no ring client and scheduler address configured](https://github.com/grafana/loki/blob/b05f4fced305800b32641ae84e3bed5f1794fa7d/pkg/querier/worker_service.go#L115). I have fixed this issue by adding a new hidden target to initialize the ring client in `reader`/`member` mode based on which service is initializing it. `reader` mode will be used by `queriers` and `query-frontend` for discovering `query-scheduler` instances from the ring. `member` mode will be used by `query-schedulers` for registering themselves in the ring. I have also made a couple of changes not directly related to the issue but it fixes some problems: * [reset metric registry for each integration test](https://github.com/grafana/loki/commit/18c4fe59078b649ad6a788a48765b101d0b97618) - Previously we were reusing the same registry for all the tests and just [ignored the attempts to register same metrics](https://github.com/grafana/loki/blob/01f0ded7fcb57e3a7b26ffc1e8e3abf04a403825/integration/cluster/cluster.go#L113). This causes the registry to have metrics registered only from the first test so any updates from subsequent tests won't reflect in the metrics. metrics was the only reliable way for me to verify that `query-schedulers` were connected to `queriers` and `query-frontend` when running in ring mode in the integration test that I added to test my changes. This should also help with other tests where earlier it was hard to reliably check the metrics. * [load config from cli as well before applying dynamic config](https://github.com/grafana/loki/commit/f9e2448fc7e718db107165cd908054c806b84337) - Previously we were applying dynamic config considering just the config from config file. This results in unexpected config changes, for example, [this config change](https://github.com/grafana/loki/blob/4148dd2c51cb827ec3889298508b95ec7731e7fd/integration/loki_micro_services_test.go#L66) was getting ignored and [dynamic config tuning was unexpectedly turning on ring mode](https://github.com/grafana/loki/blob/52cd0a39b8266564352c61ab9b845ab597008770/pkg/loki/config_wrapper.go#L94) in the config. It is better to do any config tuning based on both file and cli args configs. **Which issue(s) this PR fixes**: Fixes #9195 --- CHANGELOG.md | 1 + integration/cluster/cluster.go | 16 +- .../loki_micro_services_delete_test.go | 7 +- integration/loki_micro_services_test.go | 118 ++++++++ integration/loki_rule_eval_test.go | 1 + pkg/loki/loki.go | 65 ++--- pkg/loki/modules.go | 42 ++- pkg/querier/worker_service.go | 4 +- pkg/scheduler/lifecycle.go | 28 ++ pkg/scheduler/ringmanager.go | 252 ++++++++++++++++++ pkg/scheduler/scheduler.go | 146 ++-------- pkg/util/cfg/dynamic.go | 2 + 12 files changed, 499 insertions(+), 183 deletions(-) create mode 100644 pkg/scheduler/lifecycle.go create mode 100644 pkg/scheduler/ringmanager.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bed7e54de..b8fced8864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [9252](https://github.com/grafana/loki/pull/9252) **jeschkies**: Use un-escaped regex literal for string matching. * [9176](https://github.com/grafana/loki/pull/9176) **DylanGuedes**: Fix incorrect association of per-stream rate limit when sharding is enabled. * [9463](https://github.com/grafana/loki/pull/9463) **Totalus**: Fix OpenStack Swift client object listing to fetch all the objects properly. +* [9471](https://github.com/grafana/loki/pull/9471) **sandeepsukhani**: query-scheduler: fix query distribution in SSD mode. * [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer. * [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams. diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 754df3eba8..56020f81be 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -33,8 +33,6 @@ import ( ) var ( - wrapRegistryOnce sync.Once - configTemplate = template.Must(template.New("").Parse(` auth_enabled: true @@ -108,18 +106,18 @@ ruler: `)) ) -func wrapRegistry() { - wrapRegistryOnce.Do(func() { - prometheus.DefaultRegisterer = &wrappedRegisterer{Registerer: prometheus.DefaultRegisterer} - }) +func resetMetricRegistry() { + registry := &wrappedRegisterer{Registry: prometheus.NewRegistry()} + prometheus.DefaultRegisterer = registry + prometheus.DefaultGatherer = registry } type wrappedRegisterer struct { - prometheus.Registerer + *prometheus.Registry } func (w *wrappedRegisterer) Register(collector prometheus.Collector) error { - if err := w.Registerer.Register(collector); err != nil { + if err := w.Registry.Register(collector); err != nil { var aErr prometheus.AlreadyRegisteredError if errors.As(err, &aErr) { return nil @@ -151,7 +149,7 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster { util_log.Logger = level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.Allow(logLevel)) } - wrapRegistry() + resetMetricRegistry() sharedPath, err := os.MkdirTemp("", "loki-shared-data") if err != nil { panic(err.Error()) diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 4dce910e28..1ba0e3e2c2 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -310,8 +310,13 @@ func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID st } func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) { + t.Helper() + require.Equal(t, expectedValue, getMetricValue(t, metricName, metrics)) +} + +func getMetricValue(t *testing.T, metricName, metrics string) float64 { t.Helper() val, _, err := extractMetric(metricName, metrics) require.NoError(t, err) - require.Equal(t, expectedValue, val) + return val } diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index dbc6364426..359e904b5b 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -259,3 +259,121 @@ func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) { }) } } + +func TestSchedulerRing(t *testing.T) { + clu := cluster.New(nil) + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + // run initially the compactor, indexgateway, and distributor. + var ( + tCompactor = clu.AddComponent( + "compactor", + "-target=compactor", + "-boltdb.shipper.compactor.compaction-interval=1s", + "-boltdb.shipper.compactor.retention-delete-delay=1s", + // By default, a minute is added to the delete request start time. This compensates for that. + "-boltdb.shipper.compactor.delete-request-cancel-period=-60s", + "-compactor.deletion-mode=filter-and-delete", + ) + tIndexGateway = clu.AddComponent( + "index-gateway", + "-target=index-gateway", + ) + tDistributor = clu.AddComponent( + "distributor", + "-target=distributor", + ) + ) + require.NoError(t, clu.Run()) + + // then, run only the ingester and query scheduler. + var ( + tIngester = clu.AddComponent( + "ingester", + "-target=ingester", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + tQueryScheduler = clu.AddComponent( + "query-scheduler", + "-target=query-scheduler", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-query-scheduler.use-scheduler-ring=true", + ) + ) + require.NoError(t, clu.Run()) + + // finally, run the query-frontend and querier. + var ( + tQueryFrontend = clu.AddComponent( + "query-frontend", + "-target=query-frontend", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-querier.per-request-limits-enabled=true", + "-query-scheduler.use-scheduler-ring=true", + "-frontend.scheduler-worker-concurrency=5", + ) + _ = clu.AddComponent( + "querier", + "-target=querier", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-query-scheduler.use-scheduler-ring=true", + "-querier.max-concurrent=4", + ) + ) + require.NoError(t, clu.Run()) + + tenantID := randStringRunes() + + now := time.Now() + cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) + cliDistributor.Now = now + cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) + cliIngester.Now = now + cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) + cliQueryFrontend.Now = now + cliQueryScheduler := client.New(tenantID, "", tQueryScheduler.HTTPURL()) + cliQueryScheduler.Now = now + + t.Run("verify-scheduler-connections", func(t *testing.T) { + require.Eventually(t, func() bool { + // Check metrics to see if query scheduler is connected with query-frontend + metrics, err := cliQueryScheduler.Metrics() + require.NoError(t, err) + return getMetricValue(t, "cortex_query_scheduler_connected_frontend_clients", metrics) == 5 + }, 5*time.Second, 500*time.Millisecond) + + require.Eventually(t, func() bool { + // Check metrics to see if query scheduler is connected with query-frontend + metrics, err := cliQueryScheduler.Metrics() + require.NoError(t, err) + return getMetricValue(t, "cortex_query_scheduler_connected_querier_clients", metrics) == 4 + }, 5*time.Second, 500*time.Millisecond) + }) + + t.Run("ingest-logs", func(t *testing.T) { + // ingest some log lines + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + + require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"})) + }) + + t.Run("query", func(t *testing.T) { + resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) + }) +} diff --git a/integration/loki_rule_eval_test.go b/integration/loki_rule_eval_test.go index 01b711d0b4..dc840f939f 100644 --- a/integration/loki_rule_eval_test.go +++ b/integration/loki_rule_eval_test.go @@ -71,6 +71,7 @@ func testRuleEval(t *testing.T, mode string) { // and we have a circular dependency with the backend "-common.compactor-address=http://fake", "-legacy-read-mode=false", + "-query-scheduler.use-scheduler-ring=false", ) require.NoError(t, clu.Run()) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ccd8417e0e..d70034ac0d 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -342,33 +342,34 @@ type Loki struct { deps map[string][]string SignalHandler *signals.Handler - Server *server.Server - InternalServer *server.Server - ring *ring.Ring - Overrides limiter.CombinedLimits - tenantConfigs *runtime.TenantConfigs - TenantLimits validation.TenantLimits - distributor *distributor.Distributor - Ingester ingester.Interface - Querier querier.Querier - cacheGenerationLoader queryrangebase.CacheGenNumberLoader - querierAPI *querier.QuerierAPI - ingesterQuerier *querier.IngesterQuerier - Store storage.Store - tableManager *index.TableManager - frontend Frontend - ruler *base_ruler.Ruler - ruleEvaluator ruler.Evaluator - RulerStorage rulestore.RuleStore - rulerAPI *base_ruler.API - stopper queryrange.Stopper - runtimeConfig *runtimeconfig.Manager - MemberlistKV *memberlist.KVInitService - compactor *compactor.Compactor - QueryFrontEndTripperware basetripper.Tripperware - queryScheduler *scheduler.Scheduler - usageReport *analytics.Reporter - indexGatewayRingManager *indexgateway.RingManager + Server *server.Server + InternalServer *server.Server + ring *ring.Ring + Overrides limiter.CombinedLimits + tenantConfigs *runtime.TenantConfigs + TenantLimits validation.TenantLimits + distributor *distributor.Distributor + Ingester ingester.Interface + Querier querier.Querier + cacheGenerationLoader queryrangebase.CacheGenNumberLoader + querierAPI *querier.QuerierAPI + ingesterQuerier *querier.IngesterQuerier + Store storage.Store + tableManager *index.TableManager + frontend Frontend + ruler *base_ruler.Ruler + ruleEvaluator ruler.Evaluator + RulerStorage rulestore.RuleStore + rulerAPI *base_ruler.API + stopper queryrange.Stopper + runtimeConfig *runtimeconfig.Manager + MemberlistKV *memberlist.KVInitService + compactor *compactor.Compactor + QueryFrontEndTripperware basetripper.Tripperware + queryScheduler *scheduler.Scheduler + querySchedulerRingManager *scheduler.RingManager + usageReport *analytics.Reporter + indexGatewayRingManager *indexgateway.RingManager clientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -634,8 +635,9 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(TableManager, t.initTableManager) mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) - mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) + mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule) mm.RegisterModule(Analytics, t.initAnalytics) mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader) @@ -654,16 +656,17 @@ func (t *Loki) setupModuleManager() error { Distributor: {Ring, Server, Overrides, TenantConfigs, Analytics}, Store: {Overrides, IndexGatewayRing, IngesterQuerier}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics}, - Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader}, + Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader}, - QueryScheduler: {Server, Overrides, MemberlistKV, Analytics}, + QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing}, + QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing}, Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics}, RuleEvaluator: {Ring, Server, Store, Overrides, TenantConfigs, Analytics}, TableManager: {Server, Analytics}, Compactor: {Server, Overrides, MemberlistKV, Analytics}, IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing}, IngesterQuerier: {Ring}, + QuerySchedulerRing: {RuntimeConfig, Server, MemberlistKV}, IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryFrontend, Querier}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 20dcc8d9c7..0c97aaf93b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -105,6 +105,7 @@ const ( IndexGateway string = "index-gateway" IndexGatewayRing string = "index-gateway-ring" QueryScheduler string = "query-scheduler" + QuerySchedulerRing string = "query-scheduler-ring" All string = "all" Read string = "read" Write string = "write" @@ -361,7 +362,7 @@ func (t *Loki) initQuerier() (services.Service, error) { QuerierWorkerConfig: &t.Cfg.Worker, QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), - SchedulerRing: scheduler.SafeReadRing(t.queryScheduler), + SchedulerRing: scheduler.SafeReadRing(t.querySchedulerRingManager), } toMerge := []middleware.Interface{ @@ -781,7 +782,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { } roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend( combinedCfg, - scheduler.SafeReadRing(t.queryScheduler), + scheduler.SafeReadRing(t.querySchedulerRingManager), disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, @@ -1227,24 +1228,45 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { } func (t *Loki) initQueryScheduler() (services.Service, error) { - // Set some config sections from other config sections in the config struct - t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort - - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer) if err != nil { return nil, err } schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s) schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s) - t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(s) + + t.queryScheduler = s + return s, nil +} + +func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) { + if !t.Cfg.QueryScheduler.UseSchedulerRing { + return + } + + // Set some config sections from other config sections in the config struct + t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort + + managerMode := scheduler.RingManagerModeReader + if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) { + managerMode = scheduler.RingManagerModeMember + } + rm, err := scheduler.NewRingManager(managerMode, t.Cfg.QueryScheduler, util_log.Logger, prometheus.DefaultRegisterer) + + if err != nil { + return nil, gerrors.Wrap(err, "new scheduler ring manager") + } + + t.querySchedulerRingManager = rm + + t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(t.querySchedulerRingManager) if t.Cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(s) + t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(t.querySchedulerRingManager) } - t.queryScheduler = s - return s, nil + return t.querySchedulerRingManager, nil } func (t *Loki) initQueryLimiter() (services.Service, error) { diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index b9d06bf835..6ecb42ec6c 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -93,9 +93,9 @@ func InitWorkerService( externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(internalRouter)) } - //If no frontend or scheduler address has been configured, then there is no place for the + //If no scheduler ring or frontend or scheduler address has been configured, then there is no place for the //querier worker to request work from, so no need to start a worker service - if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { + if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { return nil, nil } diff --git a/pkg/scheduler/lifecycle.go b/pkg/scheduler/lifecycle.go new file mode 100644 index 0000000000..b29b96788f --- /dev/null +++ b/pkg/scheduler/lifecycle.go @@ -0,0 +1,28 @@ +package scheduler + +import ( + "github.com/grafana/dskit/ring" +) + +func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the scheduler instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} diff --git a/pkg/scheduler/ringmanager.go b/pkg/scheduler/ringmanager.go new file mode 100644 index 0000000000..b7c8272439 --- /dev/null +++ b/pkg/scheduler/ringmanager.go @@ -0,0 +1,252 @@ +package scheduler + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 10 + + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "scheduler" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "scheduler" + + // ringReplicationFactor should be 2 because we want 2 schedulers. + ringReplicationFactor = 2 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 + + // ringCheckPeriod is how often we check the ring to see if this instance is still in + // the replicaset of instances to act as schedulers. + ringCheckPeriod = 3 * time.Second +) + +// RingManagerMode defines the different modes for the RingManager to execute. +// +// The RingManager and its modes are only relevant if the Scheduler discovery is done using ring. +type RingManagerMode int + +const ( + // RingManagerModeReader is the RingManager mode executed by Loki components that want to discover Scheduler instances. + // The RingManager in reader mode will have its own ring key-value store client, but it won't try to register itself in the ring. + RingManagerModeReader RingManagerMode = iota + + // RingManagerModeMember is the RingManager mode execute by the Schedulers to register themselves in the ring. + RingManagerModeMember +) + +// RingManager is a component instantiated before all the others and is responsible for the ring setup. +// +// All Loki components that are involved with the Schedulers (including the Schedulers itself) will +// require a RingManager. However, the components that are clients of the Schedulers will run it in reader +// mode while the Schedulers itself will run the manager in member mode. +type RingManager struct { + services.Service + + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + RingLifecycler *ring.BasicLifecycler + Ring *ring.Ring + managerMode RingManagerMode + + cfg Config + + log log.Logger +} + +// NewRingManager is the recommended way of instantiating a RingManager. +// +// The other functions will assume the RingManager was instantiated through this function. +func NewRingManager(managerMode RingManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) { + rm := &RingManager{ + cfg: cfg, log: log, managerMode: managerMode, + } + + if !cfg.UseSchedulerRing { + return nil, fmt.Errorf("ring manager shouldn't be invoked when ring is not used for discovering schedulers") + } + + // instantiate kv store for both modes. + ringStore, err := kv.NewClient( + rm.cfg.SchedulerRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"), + rm.log, + ) + if err != nil { + return nil, errors.Wrap(err, "scheduler ring manager create KV store client") + } + + // instantiate ring for both mode modes. + ringCfg := rm.cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) + rm.Ring, err = ring.NewWithStoreClientAndStrategy( + ringCfg, + ringNameForServer, + ringKey, + ringStore, + ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), + prometheus.WrapRegistererWithPrefix("cortex_", registerer), + rm.log, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create ring client for scheduler ring manager") + } + + if managerMode == RingManagerModeMember { + if err := rm.startMemberMode(ringStore, registerer); err != nil { + return nil, err + } + return rm, nil + } + + if err := rm.startReaderMode(); err != nil { + return nil, err + } + return rm, nil +} + +func (rm *RingManager) startMemberMode(ringStore kv.Client, registerer prometheus.Registerer) error { + lifecyclerCfg, err := rm.cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens, rm.log) + if err != nil { + return errors.Wrap(err, "invalid ring lifecycler config") + } + + delegate := ring.BasicLifecyclerDelegate(rm) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log) + delegate = ring.NewTokensPersistencyDelegate(rm.cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, rm.log) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.SchedulerRing.HeartbeatTimeout, delegate, rm.log) + + rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, rm.log, registerer) + if err != nil { + return errors.Wrap(err, "failed to create ring lifecycler for scheduler ring manager") + } + + svcs := []services.Service{rm.RingLifecycler, rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "failed to create services manager for scheduler ring manager in member mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping) + + return nil +} + +func (rm *RingManager) startReaderMode() error { + var err error + + svcs := []services.Service{rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "failed to create services manager for scheduler ring manager in reader mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + + rm.Service = services.NewIdleService(func(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, rm.subservices) + }, func(failureCase error) error { + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) + }) + + return nil +} + +// starting implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || rm.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil { + level.Error(rm.log).Log("msg", "failed to gracefully stop scheduler ring manager dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil { + return errors.Wrap(err, "unable to start scheduler ring manager subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the schedulers we don't currently + // have any additional work so we can become ACTIVE right away. + // Wait until the ring client detected this instance in the JOINING + // state to make sure that when we'll run the initial sync we already + // know the tokens assigned to this instance. + level.Info(rm.log).Log("msg", "waiting until scheduler is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(rm.log).Log("msg", "scheduler is JOINING in the ring") + + if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(rm.log).Log("msg", "waiting until scheduler is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(rm.log).Log("msg", "scheduler is ACTIVE in the ring") + + return nil +} + +// running implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) running(ctx context.Context) error { + t := time.NewTicker(ringCheckPeriod) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case err := <-rm.subservicesWatcher.Chan(): + return errors.Wrap(err, "running scheduler ring manager subservice failed") + case <-t.C: + continue + } + } +} + +// stopping implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) stopping(_ error) error { + level.Debug(rm.log).Log("msg", "stopping scheduler ring manager") + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) +} + +// ServeHTTP serves the HTTP route /scheduler/ring. +func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if rm.cfg.UseSchedulerRing { + rm.Ring.ServeHTTP(w, req) + } else { + _, _ = w.Write([]byte("QueryScheduler running with '-query-scheduler.use-scheduler-ring' set to false.")) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f7eb444836..de5ab19599 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/grpcclient" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -36,35 +35,11 @@ import ( "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" - util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/validation" ) var errSchedulerIsNotRunning = errors.New("scheduler is not running") -const ( - // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance - // in the ring will be automatically removed. - ringAutoForgetUnhealthyPeriods = 10 - - // ringKey is the key under which we store the store gateways ring in the KVStore. - ringKey = "scheduler" - - // ringNameForServer is the name of the ring used by the compactor server. - ringNameForServer = "scheduler" - - // ringReplicationFactor should be 2 because we want 2 schedulers. - ringReplicationFactor = 2 - - // ringNumTokens sets our single token in the ring, - // we only need to insert 1 token to be used for leader election purposes. - ringNumTokens = 1 - - // ringCheckPeriod is how often we check the ring to see if this instance is still in - // the replicaset of instances to act as schedulers. - ringCheckPeriod = 3 * time.Second -) - // Scheduler is responsible for queueing and dispatching queries to Queriers. type Scheduler struct { services.Service @@ -98,8 +73,7 @@ type Scheduler struct { inflightRequests prometheus.Summary // Ring used for finding schedulers - ringLifecycler *ring.BasicLifecycler - ring *ring.Ring + ringManager *RingManager // Controls for this being a chosen scheduler shouldRun atomic.Bool @@ -140,7 +114,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } // NewScheduler creates a new Scheduler. -func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { +func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *RingManager, registerer prometheus.Registerer) (*Scheduler, error) { + if cfg.UseSchedulerRing { + if ringManager == nil { + return nil, errors.New("ring manager can't be empty when use_scheduler_ring is true") + } else if ringManager.managerMode != RingManagerModeMember { + return nil, errors.New("ring manager must be initialized in RingManagerModeMember for query schedulers") + } + } + queueMetrics := queue.NewMetrics("query_scheduler", registerer) s := &Scheduler{ cfg: cfg, @@ -150,8 +132,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe pendingRequests: map[requestKey]*schedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, queueMetrics: queueMetrics, - - requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics), + ringManager: ringManager, + requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics), } s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ @@ -185,39 +167,6 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe if cfg.UseSchedulerRing { s.shouldRun.Store(false) - ringStore, err := kv.NewClient( - cfg.SchedulerRing.KVStore, - ring.GetCodec(), - kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"), - log, - ) - if err != nil { - return nil, errors.Wrap(err, "create KV store client") - } - lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens, log) - if err != nil { - return nil, errors.Wrap(err, "invalid ring lifecycler config") - } - - // Define lifecycler delegates in reverse order (last to be called defined first because they're - // chained via "next delegate"). - delegate := ring.BasicLifecyclerDelegate(s) - delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) - delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log) - delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log) - - s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, log, registerer) - if err != nil { - return nil, errors.Wrap(err, "create ring lifecycler") - } - - ringCfg := cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) - s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", registerer), util_log.Logger) - if err != nil { - return nil, errors.Wrap(err, "create ring client") - } - - svcs = append(svcs, s.ringLifecycler, s.ring) } else { // Always run if no scheduler ring is being used. s.shouldRun.Store(true) @@ -605,9 +554,6 @@ func (s *Scheduler) isRunningOrStopping() bool { } func (s *Scheduler) starting(ctx context.Context) (err error) { - // In case this function will return error we want to unregister the instance - // from the ring. We do it ensuring dependencies are gracefully stopped if they - // were already started. defer func() { if err == nil || s.subservices == nil { return @@ -622,35 +568,6 @@ func (s *Scheduler) starting(ctx context.Context) (err error) { return errors.Wrap(err, "unable to start scheduler subservices") } - if s.cfg.UseSchedulerRing { - // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that - // someone wants to do can be done before becoming ACTIVE. For the query scheduler we don't currently - // have any additional work so we can become ACTIVE right away. - - // Wait until the ring client detected this instance in the JOINING state to - // make sure that when we'll run the initial sync we already know the tokens - // assigned to this instance. - level.Info(s.log).Log("msg", "waiting until scheduler is JOINING in the ring") - if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { - return err - } - level.Info(s.log).Log("msg", "scheduler is JOINING in the ring") - - // Change ring state to ACTIVE - if err = s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { - return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) - } - - // Wait until the ring client detected this instance in the ACTIVE state to - // make sure that when we'll run the loop it won't be detected as a ring - // topology change. - level.Info(s.log).Log("msg", "waiting until scheduler is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { - return err - } - level.Info(s.log).Log("msg", "scheduler is ACTIVE in the ring") - } - return nil } @@ -675,7 +592,7 @@ func (s *Scheduler) running(ctx context.Context) error { if !s.cfg.UseSchedulerRing { continue } - isInSet, err := util.IsInReplicationSet(s.ring, util.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr()) + isInSet, err := util.IsInReplicationSet(s.ringManager.Ring, util.RingKeyOfLeader, s.ringManager.RingLifecycler.GetInstanceAddr()) if err != nil { level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err) continue @@ -745,41 +662,10 @@ func (s *Scheduler) getConnectedFrontendClientsMetric() float64 { // SafeReadRing does a nil check on the Scheduler before attempting to return it's ring // this is necessary as many callers of this function will only have a valid Scheduler // reference if the QueryScheduler target has been specified, which is not guaranteed -func SafeReadRing(s *Scheduler) ring.ReadRing { - if s == nil || s.ring == nil || !s.cfg.UseSchedulerRing { +func SafeReadRing(s *RingManager) ring.ReadRing { + if s == nil || s.Ring == nil || !s.cfg.UseSchedulerRing { return nil } - return s.ring -} - -func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { - // When we initialize the scheduler instance in the ring we want to start from - // a clean situation, so whatever is the state we set it JOINING, while we keep existing - // tokens (if any) or the ones loaded from file. - var tokens []uint32 - if instanceExists { - tokens = instanceDesc.GetTokens() - } - - takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) - - // Tokens sorting will be enforced by the parent caller. - tokens = append(tokens, newTokens...) - - return ring.JOINING, tokens -} - -func (s *Scheduler) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} -func (s *Scheduler) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} -func (s *Scheduler) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { -} - -func (s *Scheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if s.cfg.UseSchedulerRing { - s.ring.ServeHTTP(w, req) - } else { - _, _ = w.Write([]byte("QueryScheduler running with '-query-scheduler.use-scheduler-ring' set to false.")) - } + return s.Ring } diff --git a/pkg/util/cfg/dynamic.go b/pkg/util/cfg/dynamic.go index 08f8aee896..59498a31e3 100644 --- a/pkg/util/cfg/dynamic.go +++ b/pkg/util/cfg/dynamic.go @@ -23,6 +23,8 @@ func DynamicUnmarshal(dst DynamicCloneable, args []string, fs *flag.FlagSet) err // section of the config file by taking advantage of the code in ConfigFileLoader which will load // and process the config file. ConfigFileLoader(args, "config.file", true), + // Now load the flags again, this will supersede anything set from config file with flags from the command line. + Flags(args, fs), // Apply any dynamic logic to set other defaults in the config. This function is called after parsing the // config files so that values from a common, or shared, section can be used in // the dynamic evaluation