Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/tools/querytee/splitting_handler.go

234 lines
6.7 KiB

package querytee
import (
"context"
"io"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/v3/pkg/engine"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/lokifrontend/frontend"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/tools/querytee/goldfish"
"github.com/pkg/errors"
)
type SplittingHandler struct {
codec queryrangebase.Codec
fanOutHandler queryrangebase.Handler
goldfishManager goldfish.Manager
logger log.Logger
logsQueryHandler queryrangebase.Handler
metricsQueryHandler queryrangebase.Handler
defaultHandler queryrangebase.Handler
}
func NewSplittingHandler(
codec queryrangebase.Codec,
fanOutHandler queryrangebase.Handler,
goldfishManager goldfish.Manager,
logger log.Logger,
preferredBackend *ProxyBackend,
) (http.Handler, error) {
if preferredBackend == nil {
return tenantHandler(queryrange.NewSerializeHTTPHandler(fanOutHandler, codec), logger), nil
}
splitHandlerFactory := &splitHandlerFactory{
codec: codec,
fanOutHandler: fanOutHandler,
goldfishManager: goldfishManager,
logger: logger,
preferredBackend: preferredBackend,
}
preferredRT, err := frontend.NewDownstreamRoundTripper(
preferredBackend.endpoint.String(),
&http.Transport{
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
codec,
)
if err != nil {
return nil, err
}
metricsQueryHandler := splitHandlerFactory.createSplittingHandler(true, preferredRT)
logsQueryHandler := splitHandlerFactory.createSplittingHandler(false, preferredRT)
splittingHandler := &SplittingHandler{
codec: codec,
fanOutHandler: fanOutHandler,
goldfishManager: goldfishManager,
logger: logger,
logsQueryHandler: logsQueryHandler,
metricsQueryHandler: metricsQueryHandler,
defaultHandler: preferredRT,
}
return tenantHandler(splittingHandler, logger), nil
}
func tenantHandler(next http.Handler, logger log.Logger) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, ctx, err := user.ExtractOrgIDFromHTTPRequest(r)
if err != nil {
level.Warn(logger).Log(
"msg", "failed to extract tenant ID",
"err", err,
"req", r.URL.String(),
)
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
type splitHandlerFactory struct {
codec queryrangebase.Codec
fanOutHandler queryrangebase.Handler
goldfishManager goldfish.Manager
logger log.Logger
preferredBackend *ProxyBackend
}
func (f *splitHandlerFactory) createSplittingHandler(forMetricQuery bool, defaultHandler queryrangebase.Handler) queryrangebase.Handler {
if f.preferredBackend == nil {
// No preferred backend, can't do splitting
return f.fanOutHandler
}
routerConfig := queryrange.RouterConfig{
Enabled: true,
Validate: engine.IsQuerySupported,
Handler: f.fanOutHandler, // v2Next: fan-out to all backends for goldfish
}
if f.goldfishManager != nil {
routerConfig.Start = f.goldfishManager.ComparisonStartDate()
routerConfig.Lag = f.goldfishManager.ComparisonMinAge()
}
middleware := []queryrangebase.Middleware{}
if forMetricQuery {
middleware = append(middleware, queryrangebase.StepAlignMiddleware)
}
// Create the engine router engineRouterMiddleware
engineRouterMiddleware := queryrange.NewEngineRouterMiddleware(
routerConfig,
nil, // no v1 chain middleware
f.codec,
forMetricQuery,
f.logger,
)
middleware = append(middleware, engineRouterMiddleware)
// Wrap the default backend handler (v1Next) with the router middleware
return queryrangebase.MergeMiddlewares(middleware...).Wrap(defaultHandler)
}
// ServeHTTP implements http.Handler interface to serve queries that can be split.
// If ComparisonMinAge is 0 (legacy mode), serve the non-splitting fan-out handler directly.
// If ComparisonMinAge > 0 (splitting mode), it wraps the fan-out handler with engineRouter
// middleware to serve split queries based on data age.
func (f *SplittingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, ctx, err := tenant.ExtractTenantIDFromHTTPRequest(r)
if err != nil {
level.Warn(f.logger).Log(
"msg", "failed to extract tenant ID",
"err", err,
"req", r.URL.String(),
)
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
// The codec decode/encode cycle loses custom headers, so we preserve them for downstream
headersCopy := r.Header.Clone()
ctx = context.WithValue(ctx, originalHTTPHeadersKey, headersCopy)
req, err := f.codec.DecodeRequest(ctx, r, nil)
if err != nil {
query := r.Form.Get("query")
level.Warn(f.logger).Log(
"msg", "failed to decode request",
"query", query,
"req", r.URL.String(),
"err", err,
)
server.WriteError(err, w)
return
}
var resp queryrangebase.Response
if f.goldfishManager != nil && f.goldfishManager.ComparisonMinAge() > 0 {
resp, err = f.serveSplits(ctx, req)
} else {
resp, err = f.fanOutHandler.Do(ctx, req)
}
if err != nil {
switch r := resp.(type) {
case *NonDecodableResponse:
http.Error(w, string(r.Body), r.StatusCode)
default:
level.Warn(f.logger).Log("msg", "handler failed", "err", err)
server.WriteError(err, w)
}
return
}
// Encode the response back to HTTP
httpResp, err := f.codec.EncodeResponse(ctx, r, resp)
if err != nil {
level.Warn(f.logger).Log("msg", "failed to encode response", "err", err)
server.WriteError(err, w)
return
}
defer httpResp.Body.Close()
// Copy response headers
for key, values := range httpResp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// Write status code
w.WriteHeader(httpResp.StatusCode)
// Copy response body
if _, err := io.Copy(w, httpResp.Body); err != nil {
level.Warn(f.logger).Log("msg", "unable to write response body", "err", err)
}
}
func (f *SplittingHandler) serveSplits(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
switch op := req.(type) {
case *queryrange.LokiRequest:
if op.Plan == nil {
err := errors.New("query plan is empty")
query := req.GetQuery()
level.Warn(f.logger).Log("msg", "query plan is empty", "query", query, "err", err)
return nil, err
}
switch op.Plan.AST.(type) {
case syntax.VariantsExpr, syntax.SampleExpr:
return f.metricsQueryHandler.Do(ctx, req)
default:
return f.logsQueryHandler.Do(ctx, req)
}
default:
return f.defaultHandler.Do(ctx, req)
}
}