Loki: fix handling of tail requests when using target `all` or `read` (#4642)

* better handling of tail requests when the frontend and querier are running in the same process

* adding function to determine if a module is enable based on the target supplied to Loki

* fix tests and incorrect registering of tail routes

* renamed function and made check for active recursive
k70
Ed Welch 4 years ago committed by GitHub
parent 2d24e2ea64
commit 89ee022c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      pkg/loki/loki.go
  2. 34
      pkg/loki/loki_test.go
  3. 32
      pkg/loki/modules.go
  4. 48
      pkg/querier/worker_service.go
  5. 22
      pkg/querier/worker_service_test.go

@ -209,6 +209,7 @@ type Loki struct {
// set during initialization
ModuleManager *modules.Manager
serviceMap map[string]services.Service
deps map[string][]string
Server *server.Server
ring *ring.Ring
@ -481,7 +482,34 @@ func (t *Loki) setupModuleManager() error {
}
}
t.deps = deps
t.ModuleManager = mm
return nil
}
func (t *Loki) isModuleActive(m string) bool {
for _, target := range t.Cfg.Target {
if target == m {
return true
}
if t.recursiveIsModuleActive(target, m) {
return true
}
}
return false
}
func (t *Loki) recursiveIsModuleActive(target, m string) bool {
if targetDeps, ok := t.deps[target]; ok {
for _, dep := range targetDeps {
if dep == m {
return true
}
if t.recursiveIsModuleActive(dep, m) {
return true
}
}
}
return false
}

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -52,3 +54,35 @@ func TestFlagDefaults(t *testing.T) {
require.Equal(t, c.Server.GRPCServerPingWithoutStreamAllowed, true)
require.Contains(t, gotFlags[flagToCheck], "(default true)")
}
func TestLoki_isModuleEnabled(t1 *testing.T) {
tests := []struct {
name string
target flagext.StringSliceCSV
module string
want bool
}{
{name: "Target All includes Querier", target: flagext.StringSliceCSV{"all"}, module: Querier, want: true},
{name: "Target Querier does not include Distributor", target: flagext.StringSliceCSV{"querier"}, module: Distributor, want: false},
{name: "Target Read includes Query Frontend", target: flagext.StringSliceCSV{"read"}, module: QueryFrontend, want: true},
{name: "Target Querier does not include Query Frontend", target: flagext.StringSliceCSV{"querier"}, module: QueryFrontend, want: false},
{name: "Target Query Frontend does not include Querier", target: flagext.StringSliceCSV{"query-frontend"}, module: Querier, want: false},
{name: "Multi target includes querier", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Querier, want: true},
{name: "Multi target does not include distributor", target: flagext.StringSliceCSV{"query-frontend", "query-scheduler", "querier"}, module: Distributor, want: false},
{name: "Test recursive dep, Ingester -> TenantConfigs -> RuntimeConfig", target: flagext.StringSliceCSV{"ingester"}, module: RuntimeConfig, want: true},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &Loki{
Cfg: Config{
Target: tt.target,
},
}
err := t.setupModuleManager()
assert.NoError(t1, err)
if got := t.isModuleActive(tt.module); got != tt.want {
t1.Errorf("isModuleActive() = %v, want %v", got, tt.want)
}
})
}
}

@ -237,17 +237,30 @@ func (t *Loki) initQuerier() (services.Service, error) {
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),
"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
}
// We always want to register tail routes externally, tail requests are different from normal queries, they
// are HTTP requests that get upgraded to websocket requests and need to be handled/kept open by the Queriers.
// The frontend has code to proxy these requests, however when running in the same processes
// (such as target=All or target=Read) we don't want the frontend to proxy and instead we want the Queriers
// to directly register these routes.
// In practice this means we always want the queriers to register the tail routes externally, when a querier
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
}
return querier.InitWorkerService(
querierWorkerServiceConfig, queryHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
querierWorkerServiceConfig, queryHandlers, alwaysExternalHandlers, t.Server.HTTP, t.Server.HTTPServer.Handler, t.HTTPAuthMiddleware,
)
}
@ -479,7 +492,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)
var defaultHandler http.Handler
if t.Cfg.Frontend.TailProxyURL != "" {
// If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
@ -511,9 +525,13 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/api/prom/series").Methods("GET", "POST").Handler(frontendHandler)
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
// Only register tailing requests if this process does not act as a Querier
// If this process is also a Querier the Querier will register the tail endpoints.
if !t.isModuleActive(Querier) {
// defer tail endpoints to the default handler
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(defaultHandler)
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
}
if t.frontend == nil {
return services.NewIdleService(nil, func(_ error) error {

@ -46,16 +46,35 @@ type WorkerServiceConfig struct {
func InitWorkerService(
cfg WorkerServiceConfig,
queryRoutesToHandlers map[string]http.Handler,
alwaysExternalRoutesToHandlers map[string]http.Handler,
externalRouter *mux.Router,
externalHandler http.Handler,
authMiddleware middleware.Interface,
) (serve services.Service, err error) {
// Create a couple Middlewares used to handle panics, perform auth, and parse Form's in http request
internalMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
// External middleware also needs to set JSON content type headers
externalMiddleware := middleware.Merge(
internalMiddleware,
serverutil.ResponseJSONMiddleware(),
)
internalRouter := mux.NewRouter()
for route, handler := range queryRoutesToHandlers {
internalRouter.Path(route).Methods("GET", "POST").Handler(handler)
}
// There are some routes which are always registered on the external router, add them now and
// wrap them with the externalMiddleware
for route, handler := range alwaysExternalRoutesToHandlers {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(handler))
}
// If the querier is running standalone without the query-frontend or query-scheduler, we must register the internal
// HTTP handler externally (as it's the only handler that needs to register on querier routes) and provide the
// external Loki Server HTTP handler to the frontend worker to ensure requests it processes use the default
@ -70,7 +89,10 @@ func InitWorkerService(
idx++
}
registerRoutesExternally(routes, externalRouter, internalRouter, authMiddleware)
// Register routes externally
for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(externalMiddleware.Wrap(internalRouter))
}
//If no frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
@ -107,16 +129,7 @@ func InitWorkerService(
return "internalQuerier"
}))
// If queries are processed using the external HTTP Server, we need wrap the internal querier with
// HTTP router with middleware to parse the tenant ID from the HTTP header and inject it into the
// request context, as well as make sure any x-www-url-formencoded params are correctly parsed
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
internalHandler = httpMiddleware.Wrap(internalHandler)
internalHandler = internalMiddleware.Wrap(internalHandler)
//Querier worker's max concurrent requests must be the same as the querier setting
(*cfg.QuerierWorkerConfig).MaxConcurrentRequests = cfg.QuerierMaxConcurrent
@ -131,19 +144,6 @@ func InitWorkerService(
prometheus.DefaultRegisterer)
}
func registerRoutesExternally(routes []string, externalRouter *mux.Router, internalHandler http.Handler, authMiddleware middleware.Interface) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
authMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
)
for _, route := range routes {
externalRouter.Path(route).Methods("GET", "POST").Handler(httpMiddleware.Wrap(internalHandler))
}
}
func querierRunningStandalone(cfg WorkerServiceConfig) bool {
runningStandalone := !cfg.QueryFrontendEnabled && !cfg.QuerySchedulerEnabled && !cfg.ReadEnabled && !cfg.AllEnabled
level.Debug(util_log.Logger).Log(

@ -22,6 +22,13 @@ func Test_InitQuerierService(t *testing.T) {
}),
}
var alwaysExternalHandlers = map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, err := res.Write([]byte("test tail handler"))
require.NoError(t, err)
}),
}
testContext := func(config WorkerServiceConfig, authMiddleware middleware.Interface) (*mux.Router, services.Service) {
externalRouter := mux.NewRouter()
@ -32,6 +39,7 @@ func Test_InitQuerierService(t *testing.T) {
querierWorkerService, err := InitWorkerService(
config,
mockQueryHandlers,
alwaysExternalHandlers,
externalRouter,
http.HandlerFunc(externalRouter.ServeHTTP),
authMiddleware,
@ -57,6 +65,13 @@ func Test_InitQuerierService(t *testing.T) {
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test handler", recorder.Body.String())
// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
})
t.Run("wrap external handler with auth middleware", func(t *testing.T) {
@ -187,6 +202,13 @@ func Test_InitQuerierService(t *testing.T) {
request := httptest.NewRequest("GET", "/loki/api/v1/query", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 404, recorder.Code)
// Tail endpoints always external
recorder = httptest.NewRecorder()
request = httptest.NewRequest("GET", "/loki/api/v1/tail", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test tail handler", recorder.Body.String())
}
})

Loading…
Cancel
Save