Shut down query frontend gracefully (#7978)

**What this PR does / why we need it**:

This PR fixes a bug in the shutdown process of the query frontend. In
order to shut down the frontend gracefully without cancelling any
in-flight operations/sub-requests, the frontend service has to wait
until all in-flight operations are finished. Only then its sub-services
may be stopped.

In order to control the deterministic order of the shutdown of the
service and its child services, child services need to be started with
their own context (not with their parent context). Otherwise, if the
context of the frontend is cancelled, also the contexts of the child
services are cancelled, leading to a parallel stopping of the parent and
its child services.

During the time of the shutdown, the `RoundTripGRCP` function still
needs to accept any round-tripping requests. This is, because the
frontend round tripper is wrapped into a query range tripper ware, which
splits incoming (HTTP) query requests into multiple (gRPC) query
requests if needed.

**Which issue(s) this PR fixes**:

This fixes a race condition where queries fail, because the query
frontend shut down before the queriers that processed in-flight request
could send the query response to the frontend. This manifested in an
increased error rate of queries during restarts (rollouts) of the query
frontend.

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the `CONTRIBUTING.md` guide
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
pull/8035/head
Christian Haudum 3 years ago committed by GitHub
parent 1375fa0654
commit 17577b4d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      docs/sources/configuration/_index.md
  3. 54
      pkg/lokifrontend/frontend/v2/frontend.go
  4. 8
      pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
  5. 89
      pkg/lokifrontend/frontend/v2/frontend_test.go
  6. 6
      pkg/scheduler/scheduler.go

@ -18,6 +18,7 @@
* [7916](https://github.com/grafana/loki/pull/7916) **ssncferreira**: Add `doc-generator` tool to generate configuration flags documentation.
* [7997](https://github.com/grafana/loki/pull/7997) **kavirajk**: fix(promtail): Fix cri tags extra new lines when joining partial lines
* [8027](https://github.com/grafana/loki/pull/8027) **kavirajk**: chore(promtail): Make `batchwait` and `batchsize` config explicit with yaml tags
* [7978](https://github.com/grafana/loki/pull/7978) **chaudum**: Shut down query frontend gracefully to allow inflight requests to complete.
##### Fixes

@ -666,6 +666,12 @@ The `frontend` block configures the Loki query-frontend.
# frontend.grpc-client-config
[grpc_client_config: <grpc_client>]
# Time to wait for inflight requests to finish before forcefully shutting down.
# This needs to be aligned with the query timeout and the graceful termination
# period of the process orchestrator.
# CLI flag: -frontend.graceful-shutdown-timeout
[graceful_shutdown_timeout: <duration> | default = 5m]
# Name of network interface to read address from. This address is sent to
# query-scheduler and querier, which uses it to send the query response back to
# query-frontend.

@ -33,10 +33,11 @@ import (
// Config for a Frontend.
type Config struct {
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"`
// Used to find local IP address, that is sent to scheduler and querier-worker.
InfNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
@ -50,6 +51,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.")
f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances. Also used to determine how often to poll the scheduler-ring for addresses if the scheduler-ring is configured.")
f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.")
f.DurationVar(&cfg.GracefulShutdownTimeout, "frontend.graceful-shutdown-timeout", 5*time.Minute, "Time to wait for inflight requests to finish before forcefully shutting down. This needs to be aligned with the query timeout and the graceful termination period of the process orchestrator.")
cfg.InfNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.")
@ -144,19 +146,44 @@ func NewFrontend(cfg Config, ring ring.ReadRing, log log.Logger, reg prometheus.
}
func (f *Frontend) starting(ctx context.Context) error {
return errors.Wrap(services.StartAndAwaitRunning(ctx, f.schedulerWorkers), "failed to start frontend scheduler workers")
// Instead of re-using `ctx` from the frontend service, `schedulerWorkers`
// needs to use their own service context, because we want to control the
// stopping process in the `stopping` function of the frontend. If we would
// use the same context, the child service would be stopped automatically as
// soon as the context of the parent service is cancelled.
return errors.Wrap(services.StartAndAwaitRunning(context.Background(), f.schedulerWorkers), "failed to start frontend scheduler workers")
}
func (f *Frontend) stopping(_ error) error {
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
timeout := time.NewTimer(f.cfg.GracefulShutdownTimeout)
defer timeout.Stop()
start := time.Now()
for loop := true; loop; {
select {
case now := <-t.C:
inflight := f.requests.count()
if inflight <= 0 {
level.Debug(f.log).Log("msg", "inflight requests completed", "inflight", inflight, "elapsed", now.Sub(start))
loop = false
} else {
level.Debug(f.log).Log("msg", "waiting for inflight requests to complete", "inflight", inflight, "elapsed", now.Sub(start))
}
case now := <-timeout.C:
inflight := f.requests.count()
level.Debug(f.log).Log("msg", "timed out waiting for inflight requests to complete", "inflight", inflight, "elapsed", now.Sub(start))
loop = false
}
}
return errors.Wrap(services.StopAndAwaitTerminated(context.Background(), f.schedulerWorkers), "failed to stop frontend scheduler workers")
}
// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
@ -259,6 +286,11 @@ func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryRes
level.Warn(f.log).Log("msg", "failed to write query result to the response channel", "queryID", qrReq.QueryID, "user", userID)
}
}
// TODO(chaudum): In case the the userIDs do not match, we do not send a
// response to the req.response channel.
// In that case, the RoundTripGRPC method waits until the request context deadline exceeds.
// Only then the function finished and the request is removed from the
// requests map.
return &frontendv2pb.QueryResultResponse{}, nil
}
@ -266,6 +298,10 @@ func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryRes
// CheckReady determines if the query frontend is ready. Function parameters/return
// chosen to match the same method in the ingester
func (f *Frontend) CheckReady(_ context.Context) error {
if s := f.State(); s != services.Running {
return fmt.Errorf("%v", s)
}
workers := f.schedulerWorkers.getWorkersCount()
// If frontend is connected to at least one scheduler, we are ready.

@ -69,7 +69,13 @@ func newFrontendSchedulerWorkers(cfg Config, frontendAddress string, ring ring.R
}
func (f *frontendSchedulerWorkers) starting(ctx context.Context) error {
return services.StartAndAwaitRunning(ctx, f.watcher)
// Instead of re-using `ctx` from the frontendSchedulerWorkers service,
// `watcher` needs to use their own service context, because we want to
// control the stopping process in the `stopping` function of the
// frontendSchedulerWorkers. If we would use the same context, the child
// service would be stopped automatically as soon as the context of the
// parent service is cancelled.
return services.StartAndAwaitRunning(context.Background(), f.watcher)
}
func (f *frontendSchedulerWorkers) stopping(_ error) error {

@ -45,7 +45,6 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched
cfg.Addr = h
cfg.Port = grpcPort
// logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
f, err := NewFrontend(cfg, nil, logger, nil)
require.NoError(t, err)
@ -252,6 +251,94 @@ func TestFrontendFailedCancellation(t *testing.T) {
})
}
func TestFrontendStoppingWaitsForEmptyInflightRequests(t *testing.T) {
delayResponse := 10 * time.Millisecond
f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
// We cannot call QueryResult directly, as Frontend is not yet waiting for the response.
// It first needs to be told that enqueuing has succeeded.
go sendResponseWithDelay(f, 2*delayResponse, "test", msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte("ok"),
})
// give enough time to fill up inflight requests
time.Sleep(delayResponse)
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})
inflightRequests := 10
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := 0; i < inflightRequests; i++ {
go func() {
_, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}()
}
require.Eventually(t, func() bool {
return f.requests.count() == inflightRequests
},
3*delayResponse, // wait at least 3*delayResponse to wait for queries to be finished and removed from inlight requests map
5*time.Millisecond,
)
// blocks until all inflight requests are done
_ = f.stopping(nil)
require.Equal(t, 0, f.requests.count())
}
func TestFrontendShuttingDownLetsSubRequestsPass(t *testing.T) {
delayResponse := 100 * time.Millisecond
f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
// We cannot call QueryResult directly, as Frontend is not yet waiting for the response.
// It first needs to be told that enqueuing has succeeded.
go sendResponseWithDelay(f, delayResponse, "test", msg.QueryID, &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte("ok"),
})
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.Equal(t, services.Running, f.State())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}()
// Wait less than delayResponse to make sure we have an inflight request that
// already was sent to the scheduler and the service stays in Stopping state
// for some time.
time.Sleep(delayResponse / 10)
f.StopAsync()
// wait for Stopping state
require.Eventually(t, func() bool {
t.Log(f.State())
return f.State() == services.Stopping
}, delayResponse/2, 2*time.Millisecond)
// send (sub-)request
// This request still needs to be able to pass the RoundTripGRCP function,
// because it may be a sub-request of a query request that was split earlier
// into multiple sub-requests.
_, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
wg.Wait()
}
type mockScheduler struct {
t *testing.T
f *Frontend

@ -297,10 +297,10 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front
switch msg.GetType() {
case schedulerpb.ENQUEUE:
err = s.enqueueRequest(frontendCtx, frontendAddress, msg)
switch {
case err == nil:
switch err {
case nil:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
case err == queue.ErrTooManyRequests:
case queue.ErrTooManyRequests:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
default:
resp = &schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR, Error: err.Error()}

Loading…
Cancel
Save