From fe8bc9144fe5082b7c117f98f6b6a437aed22a9c Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Tue, 2 Nov 2021 15:03:46 -0400 Subject: [PATCH] Loki: query scheduler should send shutdown to frontends when ReplicationSet changes (#4614) * make sure we send a shutdown message to frontends from the scheduler when the ReplicationSet changes so that inflight queries are canceled and retried in the frontends. * use Write so we find an instance incase one is unhealthy --- pkg/scheduler/scheduler.go | 63 +++++++++++++++++++- pkg/scheduler/scheduler_test.go | 100 ++++++++++++++++++++++++++++++++ pkg/util/ring.go | 24 +++++++- 3 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 pkg/scheduler/scheduler_test.go diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7f05131204..d5fa3bbe95 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -28,6 +28,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "google.golang.org/grpc" lokiutil "github.com/grafana/loki/pkg/util" @@ -55,6 +56,10 @@ const ( // ringNumTokens sets our single token in the ring, // we only need to insert 1 token to be used for leader election purposes. ringNumTokens = 1 + + // ringCheckPeriod is how often we check the ring to see if this instance is still in + // the replicaset of instances to act as schedulers. + ringCheckPeriod = 3 * time.Second ) // Scheduler is responsible for queueing and dispatching queries to Queriers. @@ -85,10 +90,14 @@ type Scheduler struct { connectedQuerierClients prometheus.GaugeFunc connectedFrontendClients prometheus.GaugeFunc queueDuration prometheus.Histogram + schedulerRunning prometheus.Gauge // Ring used for finding schedulers ringLifecycler *ring.BasicLifecycler ring *ring.Ring + + // Controls for this being a chosen scheduler + shouldRun atomic.Bool } type requestKey struct { @@ -98,6 +107,7 @@ type requestKey struct { type connectedFrontend struct { connections int + frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer // This context is used for running all queries from the same frontend. // When last frontend connection is closed, context is canceled. @@ -159,12 +169,17 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe Name: "cortex_query_scheduler_connected_frontend_clients", Help: "Number of query-frontend worker clients currently connected to the query-scheduler.", }, s.getConnectedFrontendClientsMetric) + s.schedulerRunning = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_running", + Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests", + }) s.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(s.cleanupMetricsForInactiveUser) svcs := []services.Service{s.requestQueue, s.activeUsers} if cfg.UseSchedulerRing { + s.shouldRun.Store(false) ringStore, err := kv.NewClient( cfg.SchedulerRing.KVStore, ring.GetCodec(), @@ -201,6 +216,9 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe registerer.MustRegister(s.ring) } svcs = append(svcs, s.ringLifecycler, s.ring) + } else { + // Always run if no scheduler ring is being used. + s.shouldRun.Store(true) } var err error @@ -247,7 +265,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front defer s.frontendDisconnected(frontendAddress) // Response to INIT. If scheduler is not running, we skip for-loop, send SHUTTING_DOWN and exit this method. - if s.State() == services.Running { + if s.State() == services.Running && s.shouldRun.Load() { if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { return err } @@ -321,6 +339,7 @@ func (s *Scheduler) frontendConnected(frontend schedulerpb.SchedulerForFrontend_ if cf == nil { cf = &connectedFrontend{ connections: 0, + frontend: frontend, } cf.ctx, cf.cancel = context.WithCancel(context.Background()) s.connectedFrontends[msg.FrontendAddress] = cf @@ -615,12 +634,54 @@ func (s *Scheduler) starting(ctx context.Context) (err error) { } func (s *Scheduler) running(ctx context.Context) error { + t := time.NewTicker(ringCheckPeriod) + defer t.Stop() for { select { case <-ctx.Done(): return nil case err := <-s.subservicesWatcher.Chan(): return errors.Wrap(err, "scheduler subservice failed") + case <-t.C: + if !s.cfg.UseSchedulerRing { + continue + } + isInSet, err := lokiutil.IsInReplicationSet(s.ring, lokiutil.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr()) + if err != nil { + level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err) + continue + } + s.setRunState(isInSet) + } + } +} + +func (s *Scheduler) setRunState(isInSet bool) { + + if isInSet { + if s.shouldRun.CAS(false, true) { + // Value was swapped, meaning this was a state change from stopped to running. + level.Info(s.log).Log("msg", "this scheduler is in the ReplicationSet, will now accept requests.") + s.schedulerRunning.Set(1) + } + } else { + if s.shouldRun.CAS(true, false) { + // Value was swapped, meaning this was a state change from running to stopped, + // we need to send shutdown to all the connected frontends. + level.Info(s.log).Log("msg", "this scheduler is no longer in the ReplicationSet, disconnecting frontends, canceling queries and no longer accepting requests.") + + // Send a shutdown message to the connected frontends, there is no way to break the blocking Recv() in the FrontendLoop() + // so we send a message to the frontend telling them we are shutting down so they will disconnect. + // When FrontendLoop() exits for the connected querier all the inflight queries and queued queries will be canceled. + s.connectedFrontendsMu.Lock() + defer s.connectedFrontendsMu.Unlock() + for _, f := range s.connectedFrontends { + // We ignore any errors here because there isn't really an action to take and because + // the frontends are also discovering the ring changes and may already be disconnected + // or have disconnected. + _ = f.frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}) + } + s.schedulerRunning.Set(0) } } } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 0000000000..6b0b442d2e --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,100 @@ +package scheduler + +import ( + "context" + "testing" + + "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/metadata" +) + +func TestScheduler_setRunState(t *testing.T) { + + // This test is a bit crude, the method is not the most directly testable but + // this covers us to make sure we don't accidentally change the behavior of + // the little bit of logic which runs/stops the scheduler and makes sure we + // send a shutdown message to disconnect frontends. + + // To avoid a lot more complicated test setup of calling NewScheduler instead + // we make a Scheduler with the things required to avoid nil pointers + s := Scheduler{ + log: util_log.Logger, + schedulerRunning: promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_running", + Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests", + }), + } + mock := &mockSchedulerForFrontendFrontendLoopServer{} + s.connectedFrontends = map[string]*connectedFrontend{ + "127.0.0.1:9095": { + connections: 0, + frontend: mock, + ctx: nil, + cancel: nil, + }, + } + + // not_running, shouldRun == false + assert.False(t, s.shouldRun.Load()) + + // not_running -> running, shouldRun == true + s.setRunState(true) + assert.True(t, s.shouldRun.Load()) + + // running -> running, shouldRun == true + s.setRunState(true) + assert.True(t, s.shouldRun.Load()) + + // running -> not_running, shouldRun == false, shutdown message sent + s.setRunState(false) + assert.False(t, s.shouldRun.Load()) + assert.Equal(t, schedulerpb.SHUTTING_DOWN, mock.msg.Status) + mock.msg = nil + + // not_running -> not_running, shouldRun == false, no shutdown message sent + s.setRunState(false) + assert.Nil(t, mock.msg) + +} + +type mockSchedulerForFrontendFrontendLoopServer struct { + msg *schedulerpb.SchedulerToFrontend +} + +func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error { + m.msg = frontend + return nil +} + +func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) SetHeader(md metadata.MD) error { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) SendHeader(md metadata.MD) error { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) SetTrailer(md metadata.MD) { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) Context() context.Context { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) SendMsg(msg interface{}) error { + panic("implement me") +} + +func (m mockSchedulerForFrontendFrontendLoopServer) RecvMsg(msg interface{}) error { + panic("implement me") +} diff --git a/pkg/util/ring.go b/pkg/util/ring.go index 96f97e34fb..4f76b3522d 100644 --- a/pkg/util/ring.go +++ b/pkg/util/ring.go @@ -1,6 +1,10 @@ package util -import "hash/fnv" +import ( + "hash/fnv" + + "github.com/cortexproject/cortex/pkg/ring" +) // TokenFor generates a token used for finding ingesters from ring func TokenFor(userID, labels string) uint32 { @@ -9,3 +13,21 @@ func TokenFor(userID, labels string) uint32 { _, _ = h.Write([]byte(labels)) return h.Sum32() } + +// IsInReplicationSet will query the provided ring for the provided key +// and see if the provided address is in the resulting ReplicationSet +func IsInReplicationSet(r *ring.Ring, ringKey uint32, address string) (bool, error) { + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := r.Get(ringKey, ring.Write, bufDescs, bufHosts, bufZones) + if err != nil { + return false, err + } + + addrs := rs.GetAddresses() + for _, a := range addrs { + if a == address { + return true, nil + } + } + return false, nil +}