Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/tools/querytee/fanout_handler.go

459 lines
14 KiB

package querytee
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/tools/querytee/comparator"
"github.com/grafana/loki/v3/tools/querytee/goldfish"
)
// FanOutHandler implements queryrangebase.Handler and fans out requests to multiple backends.
// It returns the preferred backend's response as soon as ready, while capturing remaining
// responses for goldfish comparison in the background.
type FanOutHandler struct {
backends []*ProxyBackend
codec queryrangebase.Codec
goldfishManager goldfish.Manager
logger log.Logger
metrics *ProxyMetrics
routeName string
comparator comparator.ResponsesComparator
instrumentCompares bool
enableRace bool
}
// FanOutHandlerConfig holds configuration for creating a FanOutHandler.
type FanOutHandlerConfig struct {
Backends []*ProxyBackend
Codec queryrangebase.Codec
GoldfishManager goldfish.Manager
Logger log.Logger
Metrics *ProxyMetrics
RouteName string
Comparator comparator.ResponsesComparator
InstrumentCompares bool
EnableRace bool
}
// NewFanOutHandler creates a new FanOutHandler.
func NewFanOutHandler(cfg FanOutHandlerConfig) *FanOutHandler {
return &FanOutHandler{
backends: cfg.Backends,
codec: cfg.Codec,
goldfishManager: cfg.GoldfishManager,
logger: cfg.Logger,
metrics: cfg.Metrics,
routeName: cfg.RouteName,
comparator: cfg.Comparator,
instrumentCompares: cfg.InstrumentCompares,
enableRace: cfg.EnableRace,
}
}
// backendResult holds the result from a single backend request.
type backendResult struct {
backend *ProxyBackend
backendResp *BackendResponse
response queryrangebase.Response
err error
}
// Do implements queryrangebase.Handler. It fans out the request to all backends, returns the preferred backend's
// response, and captures remaining responses for goldfish comparison in the background.
func (h *FanOutHandler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
httpReq, err := h.codec.EncodeRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to encode request: %w", err)
}
// Preserve original headers lost during codec decode/encode cycle
if origHeaders := extractOriginalHeaders(ctx); origHeaders != nil {
copyHeaders(origHeaders, httpReq.Header)
}
issuer := detectIssuer(httpReq)
user := goldfish.ExtractUserFromQueryTags(httpReq)
level.Debug(h.logger).Log(
"msg", "Received request",
"path", httpReq.URL.Path,
"query", httpReq.URL.RawQuery,
"issuer", issuer,
"user", user,
)
// Read body for reuse across backends
var body []byte
if httpReq.Body != nil {
body, err = io.ReadAll(httpReq.Body)
if err != nil {
return nil, fmt.Errorf("failed to read request body: %w", err)
}
httpReq.Body.Close()
}
// Determine if we should sample this query
tenants, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to extract tenant IDs: %w", err)
}
shouldSample := false
if h.goldfishManager != nil {
for _, tenant := range tenants {
if h.goldfishManager.ShouldSample(tenant) {
shouldSample = true
level.Debug(h.logger).Log(
"msg", "Goldfish sampling decision",
"tenant", tenant,
"sampled", shouldSample,
"path", httpReq.URL.Path)
break
}
}
}
results := make(chan *backendResult, len(h.backends))
for i, backend := range h.backends {
go func(_ int, b *ProxyBackend) {
result := h.executeBackendRequest(ctx, httpReq, body, b, req)
// ensure a valid status code is set in case of error
if result.err != nil && result.backendResp.status == 0 {
result.backendResp.status = statusCodeFromError(result.err)
}
results <- result
// Record metrics
h.recordMetrics(result, httpReq.Method, issuer)
}(i, backend)
}
collected := make([]*backendResult, 0, len(h.backends))
for i := 0; i < len(h.backends); i++ {
result := <-results
collected = append(collected, result)
// Race mode: return first successful response from ANY backend
// TODO: move race logic to a separate function, and catch the condition in Do and fan out to two different functions
// rather than have it all nested in one.
if h.enableRace {
// Check if this is the first successful result (race winner)
if result.err == nil && result.backendResp.succeeded() {
// Record race win metric
h.metrics.raceWins.WithLabelValues(
result.backend.name,
h.routeName,
).Inc()
// Spawn goroutine to collect remaining responses
remaining := len(h.backends) - i - 1
go func() {
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
}()
return result.response, nil
}
} else {
// Non-race mode: existing logic (wait for preferred)
if result.backend.preferred {
// when the preferred backend fails (5xx or request error) we return any successful backend response
if !result.backendResp.succeeded() {
continue
}
// spawn goroutine to capture remaining and do goldfish comparison
remaining := len(h.backends) - i - 1
go func() {
h.collectRemainingAndCompare(remaining, httpReq, results, collected, shouldSample)
}()
// when the preferred backends succeeds, but with error, that indicates an invalid request
// return the error to the client
if result.err != nil {
return &NonDecodableResponse{
StatusCode: result.backendResp.status,
Body: result.backendResp.body,
}, result.err
}
return result.response, nil
}
}
}
// if we get here, no preferred backend was found or the preferred backend
// failed. in this case, return any successful response or an error if all failed
for _, result := range collected {
if result.err == nil && result.backendResp.succeeded() {
return result.response, nil
}
}
if len(collected) > 0 && collected[0].err != nil {
result := collected[0]
return &NonDecodableResponse{
StatusCode: result.backendResp.status,
Body: result.backendResp.body,
}, result.err
}
return nil, fmt.Errorf("all backends failed")
}
// collectRemainingAndCompare collects remaining backend results, performs comparisons,
// and processes goldfish sampling. Should be called asynchronously to not block preferred response from returning.
func (h *FanOutHandler) collectRemainingAndCompare(remaining int, httpReq *http.Request, results <-chan *backendResult, collected []*backendResult, shouldSample bool) {
issuer := detectIssuer(httpReq)
for range remaining {
r := <-results
collected = append(collected, r)
}
var preferredResult *backendResult
for _, r := range collected {
if r.backend.preferred {
preferredResult = r
break
}
}
for _, r := range collected {
if h.comparator != nil && !r.backend.preferred {
result := comparisonSuccess
summary, err := h.compareResponses(preferredResult.backendResp, r.backendResp, time.Now().UTC())
if err != nil {
level.Error(h.logger).Log("msg", "response comparison failed",
"backend-name", r.backend.name,
"route-name", h.routeName,
"query", httpReq.URL.RawQuery,
"err", err)
result = comparisonFailed
} else if summary != nil && summary.Skipped {
result = comparisonSkipped
}
if h.instrumentCompares && summary != nil {
h.metrics.missingMetrics.WithLabelValues(
r.backend.name, h.routeName, result, issuer,
).Observe(float64(summary.MissingMetrics))
}
h.metrics.responsesComparedTotal.WithLabelValues(
r.backend.name, h.routeName, result, issuer,
).Inc()
}
}
if shouldSample {
h.processGoldfishComparison(httpReq, preferredResult, collected)
}
}
func (h *FanOutHandler) compareResponses(expectedResponse, actualResponse *BackendResponse, queryEvalTime time.Time) (*comparator.ComparisonSummary, error) {
if expectedResponse.err != nil {
return &comparator.ComparisonSummary{Skipped: true}, nil
}
if actualResponse.err != nil {
return nil, fmt.Errorf("skipped comparison of response because the request to the secondary backend failed: %w", actualResponse.err)
}
// compare response body only if we get a 200
if expectedResponse.status != 200 {
return &comparator.ComparisonSummary{Skipped: true}, nil
}
if actualResponse.status != 200 {
return nil, fmt.Errorf("skipped comparison of response because we got status code %d from secondary backend's response", actualResponse.status)
}
if expectedResponse.status != actualResponse.status {
return nil, fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status)
}
return h.comparator.Compare(expectedResponse.body, actualResponse.body, queryEvalTime)
}
// executeBackendRequest executes a request to a single backend.
func (h *FanOutHandler) executeBackendRequest(
ctx context.Context,
httpReq *http.Request,
body []byte,
backend *ProxyBackend,
req queryrangebase.Request,
) *backendResult {
start := time.Now()
// Clone the request for this backend
clonedReq := httpReq.Clone(ctx)
if len(body) > 0 {
clonedReq.Body = io.NopCloser(bytes.NewReader(body))
}
// Check filter
if backend.filter != nil && !backend.filter.Match([]byte(httpReq.URL.String())) {
level.Debug(h.logger).Log(
"msg", "Skipping backend due to filter",
"backend", backend.name,
"path", httpReq.URL.Path,
)
return &backendResult{
backend: backend,
backendResp: &BackendResponse{
duration: time.Since(start),
},
err: fmt.Errorf("filtered out"),
}
}
// Forward request to backend
backendResp := backend.ForwardRequest(clonedReq, clonedReq.Body)
result := &backendResult{
backend: backend,
backendResp: backendResp,
}
if backendResp.err != nil {
result.err = backendResp.err
return result
}
// Decode the response
httpResp := &http.Response{
StatusCode: backendResp.status,
Header: backendResp.headers,
Body: io.NopCloser(bytes.NewReader(backendResp.body)),
}
response, err := h.codec.DecodeResponse(ctx, httpResp, req)
if err != nil {
result.err = fmt.Errorf("failed to decode response from %s: %w", backend.name, err)
return result
}
result.response = response
return result
}
// recordMetrics records request duration metrics.
func (h *FanOutHandler) recordMetrics(result *backendResult, method, issuer string) {
if h.metrics == nil {
return
}
h.metrics.responsesTotal.WithLabelValues(
result.backend.name,
method,
h.routeName,
issuer,
).Inc()
h.metrics.requestDuration.WithLabelValues(
result.backend.name,
method,
h.routeName,
strconv.FormatInt(int64(result.backendResp.status), 10),
issuer,
).Observe(result.backendResp.duration.Seconds())
}
// processGoldfishComparison processes responses for goldfish comparison.
func (h *FanOutHandler) processGoldfishComparison(httpReq *http.Request, preferredResult *backendResult, results []*backendResult) {
if h.goldfishManager == nil || len(results) < 2 || preferredResult == nil {
return
}
tenantID, _, _ := tenant.ExtractTenantIDFromHTTPRequest(httpReq)
// Find preferred and non-preferred responses
for _, r := range results {
if r.err != nil || r.backend.preferred {
continue
}
level.Info(h.logger).Log("msg", "processing responses with Goldfish",
"tenant", tenantID,
"query", httpReq.URL.Query().Get("query"),
"cellA_backend", preferredResult.backend.name,
"cellA_status", preferredResult.backendResp.status,
"cellB_backend", r.backend.name,
"cellB_status", r.backendResp.status,
)
// Re-encode responses to capture body bytes for goldfish
h.sendToGoldfish(httpReq, preferredResult, r)
}
}
// sendToGoldfish sends the responses to goldfish for comparison.
func (h *FanOutHandler) sendToGoldfish(httpReq *http.Request, cellA, cellB *backendResult) {
cellAResp := &goldfish.BackendResponse{
BackendName: cellA.backend.name,
Status: cellA.backendResp.status,
Body: cellA.backendResp.body,
Duration: cellA.backendResp.duration,
TraceID: cellA.backendResp.traceID,
SpanID: cellA.backendResp.spanID,
}
cellBResp := &goldfish.BackendResponse{
BackendName: cellB.backend.name,
Status: cellB.backendResp.status,
Body: cellB.backendResp.body,
Duration: cellB.backendResp.duration,
TraceID: cellB.backendResp.traceID,
SpanID: cellB.backendResp.spanID,
}
h.goldfishManager.SendToGoldfish(httpReq, cellAResp, cellBResp)
}
// WithMetrics sets metrics for the handler.
func (h *FanOutHandler) WithMetrics(metrics *ProxyMetrics) *FanOutHandler {
h.metrics = metrics
return h
}
// WithComparator sets the response comparator.
func (h *FanOutHandler) WithComparator(comparator comparator.ResponsesComparator) *FanOutHandler {
h.comparator = comparator
return h
}
func extractOriginalHeaders(ctx context.Context) http.Header {
if headers, ok := ctx.Value(originalHTTPHeadersKey).(http.Header); ok {
return headers
}
return nil
}
func copyHeaders(from, to http.Header) {
for key, values := range from {
for _, value := range values {
to.Add(key, value)
}
}
}
// statusCodeFromError determines the appropriate HTTP status code when a backend request fails.
func statusCodeFromError(err error) int {
if errors.Is(err, context.Canceled) {
return 499 // Client Closed Request
}
if errors.Is(err, context.DeadlineExceeded) {
return http.StatusGatewayTimeout // 504
}
return http.StatusInternalServerError
}