feat: add race tolerance to query-tee (#20228)

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
pull/19249/merge
Trevor Whitney 4 months ago committed by GitHub
parent 8584d92ed7
commit 014520a3d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 142
      tools/querytee/fanout_handler.go
  2. 19
      tools/querytee/fanout_handler_test.go
  3. 4
      tools/querytee/handler_factory.go
  4. 3
      tools/querytee/proxy.go
  5. 1
      tools/querytee/proxy_endpoint.go

@ -32,6 +32,7 @@ type FanOutHandler struct {
comparator comparator.ResponsesComparator
instrumentCompares bool
enableRace bool
raceTolerance time.Duration
}
// FanOutHandlerConfig holds configuration for creating a FanOutHandler.
@ -45,6 +46,7 @@ type FanOutHandlerConfig struct {
Comparator comparator.ResponsesComparator
InstrumentCompares bool
EnableRace bool
RaceTolerance time.Duration
}
// NewFanOutHandler creates a new FanOutHandler.
@ -59,6 +61,7 @@ func NewFanOutHandler(cfg FanOutHandlerConfig) *FanOutHandler {
comparator: cfg.Comparator,
instrumentCompares: cfg.InstrumentCompares,
enableRace: cfg.EnableRace,
raceTolerance: cfg.RaceTolerance,
}
}
@ -108,39 +111,8 @@ func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (que
if err != nil {
return nil, fmt.Errorf("failed to extract tenant IDs: %w", err)
}
shouldSample := false
if h.goldfishManager != nil {
for _, tenant := range tenants {
if h.goldfishManager.ShouldSample(tenant) {
shouldSample = true
level.Debug(h.logger).Log(
"msg", "Goldfish sampling decision",
"tenant", tenant,
"sampled", shouldSample,
"path", httpReq.URL.Path)
break
}
}
}
results := make(chan *backendResult, len(h.backends))
for i, backend := range h.backends {
go func(_ int, b *ProxyBackend) {
result := h.executeBackendRequest(ctx, httpReq, body, b, req)
// ensure a valid status code is set in case of error
if result.err != nil && result.backendResp.status == 0 {
result.backendResp.status = statusCodeFromError(result.err)
}
results <- result
// Record metrics
h.recordMetrics(result, httpReq.Method, issuer)
}(i, backend)
}
shouldSample := h.shouldSample(tenants, httpReq)
results := h.makeBackendRequests(ctx, httpReq, body, req, issuer)
collected := make([]*backendResult, 0, len(h.backends))
for i := 0; i < len(h.backends); i++ {
@ -148,29 +120,38 @@ func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (que
collected = append(collected, result)
// Race mode: return first successful response from ANY backend
// TODO: move race logic to a separate function, and catch the condition in Do and fan out to two different functions
// rather than have it all nested in one.
if h.enableRace {
// Check if this is the first successful result (race winner)
if result.err == nil && result.backendResp.succeeded() {
// Record race win metric
h.metrics.raceWins.WithLabelValues(
result.backend.name,
h.routeName,
).Inc()
// Spawn goroutine to collect remaining responses
winner := result
remaining := len(h.backends) - i - 1
go func() {
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
}()
return result.response, nil
// If the preferred (v1) backend wins, then apply the handicap to give v2 a
// chance to "win" by finishing within the race tolerance of v1.
if result.backend.preferred && h.raceTolerance > 0 {
select {
case r2 := <-results:
collected = append(collected, r2)
if r2.err == nil && r2.backendResp.succeeded() {
winner = r2
remaining = len(h.backends) - i - 2
}
case <-time.After(h.raceTolerance):
// tolerance expired, fall back to original winner
}
}
return h.finishRace(
winner,
remaining,
httpReq,
results,
collected,
shouldSample)
}
} else {
// Non-race mode: existing logic (wait for preferred)
// Non-race mode: legacy logic (wait for preferred)
if result.backend.preferred {
// when the preferred backend fails (5xx or request error) we return any successful backend response
// when the preferred backend fails return any successful response
if !result.backendResp.succeeded() {
continue
}
@ -212,6 +193,20 @@ func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (que
return nil, fmt.Errorf("all backends failed")
}
// finishRace records the race winner and spawns a goroutine to collect remaining results.
func (h *FanOutHandler) finishRace(winner *backendResult, remaining int, httpReq *http.Request, results <-chan *backendResult, collected []*backendResult, shouldSample bool) (queryrangebase.Response, error) {
h.metrics.raceWins.WithLabelValues(
winner.backend.name,
h.routeName,
).Inc()
go func() {
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
}()
return winner.response, nil
}
// collectRemainingAndCompare collects remaining backend results, performs comparisons,
// and processes goldfish sampling. Should be called asynchronously to not block preferred response from returning.
func (h *FanOutHandler) collectRemainingAndCompare(remaining int, httpReq *http.Request, results <-chan *backendResult, collected []*backendResult, shouldSample bool) {
@ -430,6 +425,55 @@ func (h *FanOutHandler) WithComparator(comparator comparator.ResponsesComparator
return h
}
// shouldSample determines if a query should be sampled for goldfish comparison.
func (h *FanOutHandler) shouldSample(tenants []string, httpReq *http.Request) bool {
if h.goldfishManager == nil {
return false
}
for _, tenant := range tenants {
if h.goldfishManager.ShouldSample(tenant) {
level.Debug(h.logger).Log(
"msg", "Goldfish sampling decision",
"tenant", tenant,
"sampled", true,
"path", httpReq.URL.Path)
return true
}
}
return false
}
// makeBackendRequests initiates backend requests and returns a channel for receiving results.
func (h *FanOutHandler) makeBackendRequests(
ctx context.Context,
httpReq *http.Request,
body []byte,
req queryrangebase.Request,
issuer string,
) chan *backendResult {
results := make(chan *backendResult, len(h.backends))
for i, backend := range h.backends {
go func(_ int, b *ProxyBackend) {
result := h.executeBackendRequest(ctx, httpReq, body, b, req)
// ensure a valid status code is set in case of error
if result.err != nil && result.backendResp.status == 0 {
result.backendResp.status = statusCodeFromError(result.err)
}
results <- result
// Record metrics
h.recordMetrics(result, httpReq.Method, issuer)
}(i, backend)
}
return results
}
func extractOriginalHeaders(ctx context.Context) http.Header {
if headers, ok := ctx.Value(originalHTTPHeadersKey).(http.Header); ok {
return headers

@ -198,9 +198,9 @@ func TestFanOutHandler_Do_WithFilter(t *testing.T) {
require.Equal(t, 1, requestCount, "expected only 1 backend to receive request due to filter")
}
func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
func TestFanOutHandler_Do_RaceModeReturnsNonPreferredIfWithinTolerance(t *testing.T) {
backend1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"backend":"1"},"values":[["1000000000","log line 1"]]}]}}`))
@ -209,7 +209,7 @@ func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
defer backend1.Close()
backend2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(10 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"backend":"2"},"values":[["1000000000","log line 2"]]}]}}`))
@ -226,12 +226,13 @@ func TestFanOutHandler_Do_RaceModeReturnsFirstSuccessful(t *testing.T) {
}
handler := NewFanOutHandler(FanOutHandlerConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
Logger: log.NewNopLogger(),
Metrics: NewProxyMetrics(prometheus.NewRegistry()),
RouteName: "test_route",
EnableRace: true,
Backends: backends,
Codec: queryrange.DefaultCodec,
Logger: log.NewNopLogger(),
Metrics: NewProxyMetrics(prometheus.NewRegistry()),
RouteName: "test_route",
EnableRace: true,
RaceTolerance: 100 * time.Millisecond,
})
req := &queryrange.LokiRequest{

@ -2,6 +2,7 @@ package querytee
import (
"net/http"
"time"
"github.com/go-kit/log"
@ -22,6 +23,7 @@ type HandlerFactory struct {
enableRace bool
logger log.Logger
metrics *ProxyMetrics
raceTolerance time.Duration
}
// HandlerFactoryConfig holds configuration for creating a HandlerFactory.
@ -33,6 +35,7 @@ type HandlerFactoryConfig struct {
EnableRace bool
Logger log.Logger
Metrics *ProxyMetrics
RaceTolerance time.Duration
}
// NewHandlerFactory creates a new HandlerFactory.
@ -45,6 +48,7 @@ func NewHandlerFactory(cfg HandlerFactoryConfig) *HandlerFactory {
enableRace: cfg.EnableRace,
logger: cfg.Logger,
metrics: cfg.Metrics,
raceTolerance: cfg.RaceTolerance,
}
}

@ -44,6 +44,7 @@ type ProxyConfig struct {
RequestURLFilter *regexp.Regexp
InstrumentCompares bool
EnableRace bool
RaceTolerance time.Duration
Goldfish goldfish.Config
}
@ -66,6 +67,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
})
f.BoolVar(&cfg.InstrumentCompares, "proxy.compare-instrument", false, "Reports metrics on comparisons of responses between preferred and non-preferred endpoints for supported routes.")
f.BoolVar(&cfg.EnableRace, "proxy.enable-race", false, "When enabled, return the first successful response from any backend instead of waiting for the preferred backend.")
f.DurationVar(&cfg.RaceTolerance, "proxy.race-tolerance", 100*time.Millisecond, "The tolerance for handicapping races in favor of non-preferred backends. If the preferred backend finishes first but a non-preferred backend completes within this tolerance, the non-preferred backend is declared the winner.")
// Register Goldfish configuration flags
cfg.Goldfish.RegisterFlags(f)
@ -299,6 +301,7 @@ func (p *Proxy) Start() error {
Metrics: p.metrics,
InstrumentCompares: p.cfg.InstrumentCompares,
EnableRace: p.cfg.EnableRace,
RaceTolerance: p.cfg.RaceTolerance,
})
queryHandler, err := routeHandlerFactory.CreateHandler(route.RouteName, comp)
if err != nil {

@ -388,6 +388,7 @@ func (r *BackendResponse) statusCode() int {
return r.status
}
// TODO(twhitney): detectIssuer should also detect Grafana as an issuer
func detectIssuer(r *http.Request) string {
if strings.HasPrefix(r.Header.Get("User-Agent"), "loki-canary") {
return canaryIssuer

Loading…
Cancel
Save