mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
514 lines
18 KiB
514 lines
18 KiB
package querytee
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"regexp"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/gorilla/mux"
|
|
"github.com/grafana/dskit/flagext"
|
|
"github.com/grafana/dskit/middleware"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange"
|
|
"github.com/grafana/loki/v3/pkg/querytee/comparator"
|
|
"github.com/grafana/loki/v3/pkg/querytee/goldfish"
|
|
)
|
|
|
|
var errMinBackends = errors.New("at least 1 backend is required")
|
|
|
|
type RoutingMode string
|
|
|
|
const (
|
|
RoutingModeRace RoutingMode = "race"
|
|
RoutingModeV1Preferred RoutingMode = "v1-preferred"
|
|
RoutingModeV2Preferred RoutingMode = "v2-preferred"
|
|
)
|
|
|
|
type RoutingConfig struct {
|
|
Mode RoutingMode
|
|
V1Preferred string
|
|
V2Preferred string
|
|
RaceTolerance time.Duration
|
|
|
|
// SplitStart is the start date of data available in v2 (dataobjs) storage.
|
|
// Queries for data before this date will only go to the v1 backend.
|
|
// If not set, assume v2 data is always available.
|
|
//
|
|
// When splitting queries will be split into up to three parts:
|
|
// 1. Data before SplitStart -> split goes to the v1 backend only
|
|
// 2. Data between SplitStart and (now - SplitLag) -> split goes to both v1 and v2 backends
|
|
// 3. Data after (now - SplitLag) -> split goes to the v1 backend only
|
|
SplitStart flagext.Time
|
|
|
|
// SplitLag is the minimum age of data to route to v2.
|
|
// Data newer than (now - SplitLag) will only go to the v1 backend.
|
|
// When set to 0, query splitting is disabled.
|
|
SplitLag time.Duration
|
|
|
|
// SplitRetentionDays is the lifecycle of data objects in days.
|
|
// If set, data in v2 storage is considered available only for retention days.
|
|
// Queries for data before the retention period will go to the v1 backend.
|
|
// When both SplitStart and SplitRetentionDays are set, the more restrictive of the two will
|
|
// determine v2 data availability.
|
|
SplitRetentionDays int64
|
|
|
|
// AddRoutingDecisionsToWarnings controls whether routing decisions are added
|
|
// as warnings to query responses. When enabled, responses will include
|
|
// warnings indicating which backend handled the query and how it was routed.
|
|
AddRoutingDecisionsToWarnings bool
|
|
}
|
|
|
|
func (cfg *RoutingConfig) RegisterFlags(f *flag.FlagSet) {
|
|
f.StringVar((*string)(&cfg.Mode), "routing.mode", string(RoutingModeV1Preferred), "Routing mode: race, v1-preferred, or v2-preferred")
|
|
f.StringVar(&cfg.V1Preferred, "routing.v1-preferred", "", "The hostname of the preferred v1 (chunks) backend")
|
|
f.StringVar(&cfg.V2Preferred, "routing.v2-preferred", "", "The hostname of the preferred v2 (dataobjs) backend")
|
|
f.DurationVar(&cfg.RaceTolerance, "routing.race-tolerance", 100*time.Millisecond, "Race handicap for v2 in race mode")
|
|
f.Var(&cfg.SplitStart, "routing.split-start", "Start date when v2 data became available. Format YYYY-MM-DD. Queries before this date go only to v1.")
|
|
f.DurationVar(&cfg.SplitLag, "routing.split-lag", 0, "Minimum age of data to route to v2. Data newer than this goes only to v1. When 0 (default), splitting is disabled.")
|
|
f.Int64Var(&cfg.SplitRetentionDays, "routing.split-retention-days", 0, "Lifecycle of data objects in days. If set, data outside of retention period will not be available in v2 storage. When both split-start and split-retention-days are set, the more restrictive of the two will apply.")
|
|
f.BoolVar(&cfg.AddRoutingDecisionsToWarnings, "routing.add-routing-decisions-to-warnings", false, "Add routing decisions as warnings to query responses.")
|
|
}
|
|
|
|
func (cfg *RoutingConfig) Validate() error {
|
|
switch cfg.Mode {
|
|
case RoutingModeRace, RoutingModeV1Preferred, RoutingModeV2Preferred:
|
|
default:
|
|
return fmt.Errorf("invalid routing mode: %s", cfg.Mode)
|
|
}
|
|
|
|
if cfg.SplitLag < 0 {
|
|
return fmt.Errorf("split lag must be >= 0")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type ProxyConfig struct {
|
|
ServerServicePort int
|
|
BackendEndpoints string
|
|
BackendReadTimeout time.Duration
|
|
CompareResponses bool
|
|
DisableBackendReadProxy string
|
|
ValueComparisonTolerance float64
|
|
UseRelativeError bool
|
|
PassThroughNonRegisteredRoutes bool
|
|
SkipRecentSamples time.Duration
|
|
SkipSamplesBefore flagext.Time
|
|
RequestURLFilter *regexp.Regexp
|
|
InstrumentCompares bool
|
|
SkipFanOutWhenNotSampling bool
|
|
Goldfish goldfish.Config
|
|
|
|
Routing RoutingConfig
|
|
}
|
|
|
|
func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
|
|
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.")
|
|
f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.")
|
|
|
|
cfg.Routing.RegisterFlags(f)
|
|
|
|
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
|
|
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
|
|
f.StringVar(&cfg.DisableBackendReadProxy, "proxy.disable-backend-read", "", "Comma separated list of non-primary backend hostnames to disable their read proxy. Typically used for temporarily not passing any read requests to specified backends.")
|
|
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
|
|
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
|
|
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.")
|
|
f.Var(&cfg.SkipSamplesBefore, "proxy.compare-skip-samples-before", "Skip the samples before the given time for comparison. The time can be in RFC3339 format (or) RFC3339 without the timezone and seconds (or) date only.")
|
|
f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.")
|
|
f.Func("backend.filter", "A request filter as a regular expression. Only matches are proxied to non-preferred backends.", func(raw string) error {
|
|
var err error
|
|
cfg.RequestURLFilter, err = regexp.Compile(raw)
|
|
return err
|
|
})
|
|
f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.")
|
|
f.BoolVar(&cfg.SkipFanOutWhenNotSampling, "proxy.skip-fanout-when-not-sampling", false, "When enabled, skip fanning out requests to secondary backends when goldfish sampling is disabled (default_rate=0 and no tenant rules). This reduces load on secondary backends when not doing comparisons.")
|
|
|
|
cfg.Goldfish.RegisterFlags(f)
|
|
}
|
|
|
|
type Route struct {
|
|
Path string
|
|
RouteName string
|
|
Methods []string
|
|
ResponseComparator comparator.ResponsesComparator
|
|
}
|
|
|
|
type Proxy struct {
|
|
cfg ProxyConfig
|
|
backends []*ProxyBackend
|
|
logger log.Logger
|
|
metrics *ProxyMetrics
|
|
readRoutes []Route
|
|
writeRoutes []Route
|
|
|
|
// The HTTP server used to run the proxy service.
|
|
srv *http.Server
|
|
srvListener net.Listener
|
|
|
|
// Wait group used to wait until the server has done.
|
|
done sync.WaitGroup
|
|
|
|
// Goldfish manager for query sampling and comparison
|
|
goldfishManager goldfish.Manager
|
|
}
|
|
|
|
func NewProxy(
|
|
cfg ProxyConfig,
|
|
logger log.Logger,
|
|
readRoutes, writeRoutes []Route,
|
|
registerer prometheus.Registerer,
|
|
) (*Proxy, error) {
|
|
if err := cfg.Routing.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if cfg.CompareResponses && cfg.Routing.V1Preferred == "" {
|
|
return nil, fmt.Errorf("when enabling comparison of results -routing.v1-backend flag must be set")
|
|
}
|
|
|
|
if cfg.PassThroughNonRegisteredRoutes && cfg.Routing.V1Preferred == "" {
|
|
return nil, fmt.Errorf("when enabling passthrough for non-registered routes -routing.v1-backend flag must be set")
|
|
}
|
|
|
|
if cfg.InstrumentCompares && !cfg.CompareResponses {
|
|
return nil, fmt.Errorf("when enabling instrumentation of comparisons of results -proxy.compare-responses flag must be set")
|
|
}
|
|
|
|
// Validate Goldfish configuration
|
|
if err := cfg.Goldfish.Validate(); err != nil {
|
|
return nil, errors.Wrap(err, "invalid goldfish configuration")
|
|
}
|
|
|
|
p := &Proxy{
|
|
cfg: cfg,
|
|
logger: logger,
|
|
metrics: NewProxyMetrics(registerer),
|
|
readRoutes: readRoutes,
|
|
writeRoutes: writeRoutes,
|
|
}
|
|
|
|
// Parse the backend endpoints (comma separated).
|
|
parts := strings.Split(cfg.BackendEndpoints, ",")
|
|
|
|
for idx, part := range parts {
|
|
// Skip empty ones.
|
|
part = strings.TrimSpace(part)
|
|
if part == "" {
|
|
continue
|
|
}
|
|
|
|
u, err := url.Parse(part)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "invalid backend endpoint %s", part)
|
|
}
|
|
|
|
name := u.Hostname()
|
|
v1Preferred := name == cfg.Routing.V1Preferred
|
|
v2Preferred := name == cfg.Routing.V2Preferred
|
|
|
|
// In tests we have the same hostname for all backends, so we also
|
|
// support a numeric preferred backend which is the index in the list
|
|
// of backends.
|
|
if preferredIdx, err := strconv.Atoi(cfg.Routing.V1Preferred); err == nil {
|
|
v1Preferred = preferredIdx == idx
|
|
}
|
|
if preferredIdx, err := strconv.Atoi(cfg.Routing.V2Preferred); err == nil {
|
|
v2Preferred = preferredIdx == idx
|
|
}
|
|
|
|
level.Debug(logger).Log("msg", "backend added", "name", name, "v1Preferred", v1Preferred, "v2Preferred", v2Preferred)
|
|
backend, err := NewProxyBackend(name, u, cfg.BackendReadTimeout, v1Preferred, v2Preferred)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to create backend %s", name)
|
|
}
|
|
p.backends = append(p.backends, backend)
|
|
}
|
|
|
|
// At least 1 backend is required
|
|
if len(p.backends) < 1 {
|
|
return nil, errMinBackends
|
|
}
|
|
|
|
// If the preferred backend is configured, then it must exists among the actual backends.
|
|
if cfg.Routing.V1Preferred != "" {
|
|
exists := false
|
|
for _, b := range p.backends {
|
|
if b.v1Preferred {
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
if !exists {
|
|
return nil, fmt.Errorf("the v1 backend (hostname) has not been found among the list of configured backends")
|
|
}
|
|
}
|
|
|
|
if cfg.Routing.V2Preferred != "" {
|
|
exists := false
|
|
for _, b := range p.backends {
|
|
if b.v2Preferred {
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
if !exists {
|
|
return nil, fmt.Errorf("the v2 backend (hostname) has not been found among the list of configured backends")
|
|
}
|
|
}
|
|
|
|
if cfg.CompareResponses && len(p.backends) < 2 {
|
|
return nil, fmt.Errorf("when enabling comparison of results number of backends should be at least 2")
|
|
}
|
|
|
|
// At least 2 backends are suggested
|
|
if len(p.backends) < 2 {
|
|
level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfill the purpose of the proxy and compare results.")
|
|
}
|
|
|
|
if cfg.DisableBackendReadProxy != "" {
|
|
readDisabledBackendHosts := strings.Split(p.cfg.DisableBackendReadProxy, ",")
|
|
if slices.Contains(readDisabledBackendHosts, cfg.Routing.V1Preferred) {
|
|
return nil, fmt.Errorf("the v1 backend cannot be disabled for reading")
|
|
}
|
|
}
|
|
|
|
// Pre-initialize raceWins metric for all backend/route/issuer combinations
|
|
if cfg.Routing.Mode == RoutingModeRace {
|
|
for _, backend := range p.backends {
|
|
for _, route := range p.readRoutes {
|
|
for _, issuer := range []string{unknownIssuer, canaryIssuer} {
|
|
p.metrics.raceWins.WithLabelValues(backend.name, backend.Alias(), route.RouteName, issuer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initialize Goldfish if enabled
|
|
if cfg.Goldfish.Enabled {
|
|
// Create storage backend
|
|
storage, err := goldfish.NewStorage(cfg.Goldfish.StorageConfig, logger)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create goldfish storage")
|
|
}
|
|
|
|
var resultStore goldfish.ResultStore
|
|
if cfg.Goldfish.ResultsStorage.Enabled {
|
|
resultStore, err = goldfish.NewResultStore(context.Background(), cfg.Goldfish.ResultsStorage, logger)
|
|
if err != nil {
|
|
storage.Close()
|
|
return nil, errors.Wrap(err, "failed to create goldfish result store")
|
|
}
|
|
}
|
|
|
|
// Create Goldfish manager
|
|
goldfishManager, err := goldfish.NewManager(cfg.Goldfish, storage, resultStore, logger, registerer)
|
|
if err != nil {
|
|
if resultStore != nil {
|
|
_ = resultStore.Close(context.Background())
|
|
}
|
|
storage.Close()
|
|
return nil, errors.Wrap(err, "failed to create goldfish manager")
|
|
}
|
|
p.goldfishManager = goldfishManager
|
|
|
|
level.Info(logger).Log("msg", "Goldfish enabled",
|
|
"storage_type", cfg.Goldfish.StorageConfig.Type,
|
|
"default_rate", cfg.Goldfish.SamplingConfig.DefaultRate,
|
|
"results_mode", string(cfg.Goldfish.ResultsStorage.Mode),
|
|
"results_backend", cfg.Goldfish.ResultsStorage.Backend)
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *Proxy) Start() error {
|
|
// Setup listener first, so we can fail early if the port is in use.
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
router := mux.NewRouter()
|
|
|
|
// Health check endpoint.
|
|
router.Path("/").Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
|
|
// register read routes
|
|
for _, route := range p.readRoutes {
|
|
var comp comparator.ResponsesComparator
|
|
if p.cfg.CompareResponses {
|
|
comp = route.ResponseComparator
|
|
}
|
|
filteredBackends := filterReadDisabledBackends(p.backends, p.cfg.DisableBackendReadProxy)
|
|
endpoint := NewProxyEndpoint(
|
|
filteredBackends,
|
|
route.RouteName,
|
|
p.metrics,
|
|
p.logger,
|
|
comp,
|
|
p.cfg.InstrumentCompares,
|
|
)
|
|
|
|
// Add Goldfish if configured
|
|
if p.goldfishManager != nil {
|
|
endpoint.WithGoldfish(p.goldfishManager)
|
|
level.Info(p.logger).Log(
|
|
"msg", "Goldfish attached to route",
|
|
"path", route.Path,
|
|
"methods", strings.Join(route.Methods, ","),
|
|
"split_lag", p.cfg.Routing.SplitLag,
|
|
)
|
|
}
|
|
|
|
// Create a route-specific handler factory with the filtered backends
|
|
routeHandlerFactory := NewHandlerFactory(HandlerFactoryConfig{
|
|
Backends: filteredBackends,
|
|
Codec: queryrange.DefaultCodec,
|
|
GoldfishManager: p.goldfishManager,
|
|
Logger: p.logger,
|
|
Metrics: p.metrics,
|
|
InstrumentCompares: p.cfg.InstrumentCompares,
|
|
RoutingMode: p.cfg.Routing.Mode,
|
|
RaceTolerance: p.cfg.Routing.RaceTolerance,
|
|
SkipFanOutWhenNotSampling: p.cfg.SkipFanOutWhenNotSampling,
|
|
SplitStart: p.cfg.Routing.SplitStart,
|
|
SplitLag: p.cfg.Routing.SplitLag,
|
|
SplitRetentionDays: p.cfg.Routing.SplitRetentionDays,
|
|
AddRoutingDecisionsToWarnings: p.cfg.Routing.AddRoutingDecisionsToWarnings,
|
|
})
|
|
queryHandler, err := routeHandlerFactory.CreateHandler(route.RouteName, comp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
endpoint.WithQueryHandler(queryHandler)
|
|
level.Info(p.logger).Log(
|
|
"msg", "Query middleware handler attached to route",
|
|
"path", route.Path,
|
|
"methods", strings.Join(route.Methods, ","),
|
|
)
|
|
|
|
router.Path(route.Path).Methods(route.Methods...).Handler(endpoint)
|
|
}
|
|
|
|
// create a separate endpoint without a query handler for write requests
|
|
for _, route := range p.writeRoutes {
|
|
var comp comparator.ResponsesComparator
|
|
if p.cfg.CompareResponses {
|
|
comp = route.ResponseComparator
|
|
}
|
|
endpoint := NewProxyEndpoint(
|
|
p.backends,
|
|
route.RouteName,
|
|
p.metrics,
|
|
p.logger,
|
|
comp,
|
|
p.cfg.InstrumentCompares,
|
|
)
|
|
|
|
router.Path(route.Path).Methods(route.Methods...).Handler(endpoint)
|
|
}
|
|
|
|
if p.cfg.PassThroughNonRegisteredRoutes {
|
|
for _, backend := range p.backends {
|
|
if backend.v1Preferred {
|
|
router.PathPrefix("/").Handler(httputil.NewSingleHostReverseProxy(backend.endpoint))
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
p.srvListener = listener
|
|
// Wrap router with tracing middleware
|
|
var handler http.Handler = router
|
|
// Configure tracing middleware to extract trace headers
|
|
// This ensures trace context is properly propagated from incoming requests
|
|
tracer := middleware.NewTracer(nil, true, nil) // true enables trace header extraction
|
|
handler = tracer.Wrap(router)
|
|
level.Info(p.logger).Log("msg", "HTTP tracing middleware enabled with header extraction")
|
|
|
|
p.srv = &http.Server{
|
|
ReadTimeout: 1 * time.Minute,
|
|
WriteTimeout: 2 * time.Minute,
|
|
Handler: handler,
|
|
}
|
|
|
|
// Run in a dedicated goroutine.
|
|
p.done.Add(1)
|
|
go func() {
|
|
defer p.done.Done()
|
|
|
|
if err := p.srv.Serve(p.srvListener); err != nil {
|
|
level.Error(p.logger).Log("msg", "Proxy server failed", "err", err)
|
|
}
|
|
}()
|
|
|
|
level.Info(p.logger).Log("msg", "The proxy is up and running.")
|
|
return nil
|
|
}
|
|
|
|
func (p *Proxy) Stop() error {
|
|
if p.srv == nil {
|
|
return nil
|
|
}
|
|
|
|
// Close Goldfish manager if it exists
|
|
if p.goldfishManager != nil {
|
|
if err := p.goldfishManager.Close(); err != nil {
|
|
level.Warn(p.logger).Log("msg", "Failed to close Goldfish manager", "err", err)
|
|
}
|
|
}
|
|
|
|
return p.srv.Shutdown(context.Background())
|
|
}
|
|
|
|
func (p *Proxy) Await() {
|
|
// Wait until terminated.
|
|
p.done.Wait()
|
|
}
|
|
|
|
func (p *Proxy) Endpoint() string {
|
|
if p.srvListener == nil {
|
|
return ""
|
|
}
|
|
|
|
return p.srvListener.Addr().String()
|
|
}
|
|
|
|
func filterReadDisabledBackends(backends []*ProxyBackend, disableReadProxyCfg string) []*ProxyBackend {
|
|
readEnabledBackends := make([]*ProxyBackend, 0, len(backends))
|
|
readDisabledBackendNames := strings.Split(disableReadProxyCfg, ",")
|
|
for _, b := range backends {
|
|
if !b.v1Preferred {
|
|
readDisabled := false
|
|
for _, h := range readDisabledBackendNames {
|
|
if strings.TrimSpace(h) == b.name {
|
|
readDisabled = true
|
|
break
|
|
}
|
|
}
|
|
if readDisabled {
|
|
continue
|
|
}
|
|
}
|
|
readEnabledBackends = append(readEnabledBackends, b)
|
|
}
|
|
|
|
return readEnabledBackends
|
|
}
|
|
|