Loki: query scheduler should send shutdown to frontends when ReplicationSet changes (#4614)

* make sure we send a shutdown message to frontends from the scheduler when the ReplicationSet changes so that inflight queries are canceled and retried in the frontends.

* use Write so we find an instance incase one is unhealthy
pull/4556/head
Ed Welch 4 years ago committed by GitHub
parent 308af871cf
commit fe8bc9144f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      pkg/scheduler/scheduler.go
  2. 100
      pkg/scheduler/scheduler_test.go
  3. 24
      pkg/util/ring.go

@ -28,6 +28,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
lokiutil "github.com/grafana/loki/pkg/util"
@ -55,6 +56,10 @@ const (
// ringNumTokens sets our single token in the ring,
// we only need to insert 1 token to be used for leader election purposes.
ringNumTokens = 1
// ringCheckPeriod is how often we check the ring to see if this instance is still in
// the replicaset of instances to act as schedulers.
ringCheckPeriod = 3 * time.Second
)
// Scheduler is responsible for queueing and dispatching queries to Queriers.
@ -85,10 +90,14 @@ type Scheduler struct {
connectedQuerierClients prometheus.GaugeFunc
connectedFrontendClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
schedulerRunning prometheus.Gauge
// Ring used for finding schedulers
ringLifecycler *ring.BasicLifecycler
ring *ring.Ring
// Controls for this being a chosen scheduler
shouldRun atomic.Bool
}
type requestKey struct {
@ -98,6 +107,7 @@ type requestKey struct {
type connectedFrontend struct {
connections int
frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer
// This context is used for running all queries from the same frontend.
// When last frontend connection is closed, context is canceled.
@ -159,12 +169,17 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_connected_frontend_clients",
Help: "Number of query-frontend worker clients currently connected to the query-scheduler.",
}, s.getConnectedFrontendClientsMetric)
s.schedulerRunning = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_running",
Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests",
})
s.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(s.cleanupMetricsForInactiveUser)
svcs := []services.Service{s.requestQueue, s.activeUsers}
if cfg.UseSchedulerRing {
s.shouldRun.Store(false)
ringStore, err := kv.NewClient(
cfg.SchedulerRing.KVStore,
ring.GetCodec(),
@ -201,6 +216,9 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
registerer.MustRegister(s.ring)
}
svcs = append(svcs, s.ringLifecycler, s.ring)
} else {
// Always run if no scheduler ring is being used.
s.shouldRun.Store(true)
}
var err error
@ -247,7 +265,7 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front
defer s.frontendDisconnected(frontendAddress)
// Response to INIT. If scheduler is not running, we skip for-loop, send SHUTTING_DOWN and exit this method.
if s.State() == services.Running {
if s.State() == services.Running && s.shouldRun.Load() {
if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil {
return err
}
@ -321,6 +339,7 @@ func (s *Scheduler) frontendConnected(frontend schedulerpb.SchedulerForFrontend_
if cf == nil {
cf = &connectedFrontend{
connections: 0,
frontend: frontend,
}
cf.ctx, cf.cancel = context.WithCancel(context.Background())
s.connectedFrontends[msg.FrontendAddress] = cf
@ -615,12 +634,54 @@ func (s *Scheduler) starting(ctx context.Context) (err error) {
}
func (s *Scheduler) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-s.subservicesWatcher.Chan():
return errors.Wrap(err, "scheduler subservice failed")
case <-t.C:
if !s.cfg.UseSchedulerRing {
continue
}
isInSet, err := lokiutil.IsInReplicationSet(s.ring, lokiutil.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr())
if err != nil {
level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err)
continue
}
s.setRunState(isInSet)
}
}
}
func (s *Scheduler) setRunState(isInSet bool) {
if isInSet {
if s.shouldRun.CAS(false, true) {
// Value was swapped, meaning this was a state change from stopped to running.
level.Info(s.log).Log("msg", "this scheduler is in the ReplicationSet, will now accept requests.")
s.schedulerRunning.Set(1)
}
} else {
if s.shouldRun.CAS(true, false) {
// Value was swapped, meaning this was a state change from running to stopped,
// we need to send shutdown to all the connected frontends.
level.Info(s.log).Log("msg", "this scheduler is no longer in the ReplicationSet, disconnecting frontends, canceling queries and no longer accepting requests.")
// Send a shutdown message to the connected frontends, there is no way to break the blocking Recv() in the FrontendLoop()
// so we send a message to the frontend telling them we are shutting down so they will disconnect.
// When FrontendLoop() exits for the connected querier all the inflight queries and queued queries will be canceled.
s.connectedFrontendsMu.Lock()
defer s.connectedFrontendsMu.Unlock()
for _, f := range s.connectedFrontends {
// We ignore any errors here because there isn't really an action to take and because
// the frontends are also discovering the ring changes and may already be disconnected
// or have disconnected.
_ = f.frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN})
}
s.schedulerRunning.Set(0)
}
}
}

@ -0,0 +1,100 @@
package scheduler
import (
"context"
"testing"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
)
func TestScheduler_setRunState(t *testing.T) {
// This test is a bit crude, the method is not the most directly testable but
// this covers us to make sure we don't accidentally change the behavior of
// the little bit of logic which runs/stops the scheduler and makes sure we
// send a shutdown message to disconnect frontends.
// To avoid a lot more complicated test setup of calling NewScheduler instead
// we make a Scheduler with the things required to avoid nil pointers
s := Scheduler{
log: util_log.Logger,
schedulerRunning: promauto.With(prometheus.DefaultRegisterer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_running",
Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests",
}),
}
mock := &mockSchedulerForFrontendFrontendLoopServer{}
s.connectedFrontends = map[string]*connectedFrontend{
"127.0.0.1:9095": {
connections: 0,
frontend: mock,
ctx: nil,
cancel: nil,
},
}
// not_running, shouldRun == false
assert.False(t, s.shouldRun.Load())
// not_running -> running, shouldRun == true
s.setRunState(true)
assert.True(t, s.shouldRun.Load())
// running -> running, shouldRun == true
s.setRunState(true)
assert.True(t, s.shouldRun.Load())
// running -> not_running, shouldRun == false, shutdown message sent
s.setRunState(false)
assert.False(t, s.shouldRun.Load())
assert.Equal(t, schedulerpb.SHUTTING_DOWN, mock.msg.Status)
mock.msg = nil
// not_running -> not_running, shouldRun == false, no shutdown message sent
s.setRunState(false)
assert.Nil(t, mock.msg)
}
type mockSchedulerForFrontendFrontendLoopServer struct {
msg *schedulerpb.SchedulerToFrontend
}
func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error {
m.msg = frontend
return nil
}
func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) SetHeader(md metadata.MD) error {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) SendHeader(md metadata.MD) error {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) SetTrailer(md metadata.MD) {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) Context() context.Context {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) SendMsg(msg interface{}) error {
panic("implement me")
}
func (m mockSchedulerForFrontendFrontendLoopServer) RecvMsg(msg interface{}) error {
panic("implement me")
}

@ -1,6 +1,10 @@
package util
import "hash/fnv"
import (
"hash/fnv"
"github.com/cortexproject/cortex/pkg/ring"
)
// TokenFor generates a token used for finding ingesters from ring
func TokenFor(userID, labels string) uint32 {
@ -9,3 +13,21 @@ func TokenFor(userID, labels string) uint32 {
_, _ = h.Write([]byte(labels))
return h.Sum32()
}
// IsInReplicationSet will query the provided ring for the provided key
// and see if the provided address is in the resulting ReplicationSet
func IsInReplicationSet(r *ring.Ring, ringKey uint32, address string) (bool, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := r.Get(ringKey, ring.Write, bufDescs, bufHosts, bufZones)
if err != nil {
return false, err
}
addrs := rs.GetAddresses()
for _, a := range addrs {
if a == address {
return true, nil
}
}
return false, nil
}

Loading…
Cancel
Save