Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querytee/handler_factory.go

126 lines
4.8 KiB

package querytee
import (
"net/http"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/middleware"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/querytee/comparator"
"github.com/grafana/loki/v3/pkg/querytee/goldfish"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/server"
)
// HandlerFactory creates the appropriate handler based on configuration.
type HandlerFactory struct {
backends []*ProxyBackend
codec queryrangebase.Codec
goldfishManager goldfish.Manager
instrumentCompares bool
routingMode RoutingMode
logger log.Logger
metrics *ProxyMetrics
raceTolerance time.Duration
skipFanOutWhenNotSampling bool
splitStart time.Time
splitLag time.Duration
splitRetentionDays int64
addRoutingDecisionsToWarnings bool
}
// HandlerFactoryConfig holds configuration for creating a HandlerFactory.
type HandlerFactoryConfig struct {
Backends []*ProxyBackend
Codec queryrangebase.Codec
GoldfishManager goldfish.Manager
InstrumentCompares bool
RoutingMode RoutingMode
Logger log.Logger
Metrics *ProxyMetrics
RaceTolerance time.Duration
SkipFanOutWhenNotSampling bool
SplitStart flagext.Time
SplitLag time.Duration
SplitRetentionDays int64
AddRoutingDecisionsToWarnings bool
}
// NewHandlerFactory creates a new HandlerFactory.
func NewHandlerFactory(cfg HandlerFactoryConfig) *HandlerFactory {
return &HandlerFactory{
backends: cfg.Backends,
codec: cfg.Codec,
goldfishManager: cfg.GoldfishManager,
instrumentCompares: cfg.InstrumentCompares,
routingMode: cfg.RoutingMode,
logger: cfg.Logger,
metrics: cfg.Metrics,
raceTolerance: cfg.RaceTolerance,
skipFanOutWhenNotSampling: cfg.SkipFanOutWhenNotSampling,
splitStart: time.Time(cfg.SplitStart),
splitLag: cfg.SplitLag,
splitRetentionDays: cfg.SplitRetentionDays,
addRoutingDecisionsToWarnings: cfg.AddRoutingDecisionsToWarnings,
}
}
func (f *HandlerFactory) CreateHandler(routeName string, comp comparator.ResponsesComparator) (http.Handler, error) {
// Create the fan-out handler that sends requests to all backends
fanOutHandler := NewFanOutHandler(FanOutHandlerConfig{
Backends: f.backends,
Codec: f.codec,
Comparator: comp,
GoldfishManager: f.goldfishManager,
InstrumentCompares: f.instrumentCompares,
RoutingMode: f.routingMode,
Logger: f.logger,
Metrics: f.metrics,
RouteName: routeName,
RaceTolerance: f.raceTolerance,
AddRoutingDecisionsToWarnings: f.addRoutingDecisionsToWarnings,
})
// all routing modes (v1Preferred, v2Preferred, and race) are capable of splitting the request so that only time
// ranges valid for dataobjs are sent to v2. splitting needs to know which backend is v1 for routing the following:
// * time ranges before we have dataobjs, or within the dataobj lag are sent to v1
// * unsupported queries are sent to v1
// * unimplemented metadata queries are sent to v1
var v1Backend *ProxyBackend
for _, b := range f.backends {
if b.v1Preferred {
v1Backend = b
break
}
}
splittingHandler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: f.codec,
FanOutHandler: fanOutHandler,
GoldfishManager: f.goldfishManager,
V1Backend: v1Backend,
SkipFanoutWhenNotSampling: f.skipFanOutWhenNotSampling,
RoutingMode: f.routingMode,
SplitStart: f.splitStart,
SplitLag: f.splitLag,
SplitRetentionDays: f.splitRetentionDays,
AddRoutingDecisionsToWarnings: f.addRoutingDecisionsToWarnings,
}, f.logger)
if err != nil {
return nil, err
}
httpMiddlewares := []middleware.Interface{
// Propagate all headers except those that affect response encoding or security.
httpreq.PropagateAllHeadersMiddleware(httpreq.AuthorizationHeader, httpreq.AcceptHeader, httpreq.AcceptEncodingHeader),
httpreq.ExtractQueryTagsMiddleware(),
server.NewPrepopulateMiddleware(),
}
return middleware.Merge(httpMiddlewares...).Wrap(splittingHandler), nil
}