Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querytee/splitting_handler_test.go

921 lines
32 KiB

package querytee
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/querytee/goldfish"
"github.com/grafana/loki/v3/pkg/util/constants"
)
// mockGoldfishManager is a mock implementation of goldfish.Manager for testing.
// Fields used from async goroutines (sendCalled, sendCorrelationID) are protected by a mutex.
type mockGoldfishManager struct {
shouldSampleResult bool
correlationIDResult string
mu sync.Mutex
sendCalled bool
sendCorrelationID string
}
func (m *mockGoldfishManager) ShouldSample(_ string) (bool, string) {
return m.shouldSampleResult, m.correlationIDResult
}
func (m *mockGoldfishManager) SendToGoldfish(_ *http.Request, _, _ *goldfish.BackendResponse, correlationID string) {
m.mu.Lock()
defer m.mu.Unlock()
m.sendCalled = true
m.sendCorrelationID = correlationID
}
func (m *mockGoldfishManager) Close() error {
return nil
}
// getSendState returns the thread-safe state of SendToGoldfish calls.
func (m *mockGoldfishManager) getSendState() (bool, string) {
m.mu.Lock()
defer m.mu.Unlock()
return m.sendCalled, m.sendCorrelationID
}
func TestSplittingHandler_ServeSplits_UnsupportedRequestUsesDefaultHandler(t *testing.T) {
tests := []struct {
name string
request queryrangebase.Request
}{
{
name: "LokiInstantRequest",
request: &queryrange.LokiInstantRequest{
Query: `{app="test"}`,
TimeTs: time.Now(),
Limit: 100,
Path: constants.PathLokiQuery,
Shards: nil,
},
},
{
name: "LokiSeriesRequest",
request: &queryrange.LokiSeriesRequest{
Match: []string{`{app="test"}`},
StartTs: time.Now().Add(-1 * time.Hour),
EndTs: time.Now(),
Path: constants.PathLokiSeries,
Shards: nil,
},
},
{
name: "LabelRequest",
request: queryrange.NewLabelRequest(
time.Now().Add(-1*time.Hour),
time.Now(),
"",
"app",
constants.PathLokiLabel+"/app/values",
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var capturedTenantID string
defaultHandlerCalled := false
fanOutHandlerCalled := false
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defaultHandlerCalled = true
capturedTenantID = r.Header.Get(user.OrgIDHeaderName)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
var response string
switch r.URL.Path {
case constants.PathLokiQuery:
// LokiInstantRequest response
response = `{"status":"success","data":{"resultType":"streams","result":[]}}`
case constants.PathLokiSeries:
// LokiSeriesRequest response
response = `{"status":"success","data":[]}`
case constants.PathLokiLabel + "/app/values", constants.PathLokiLabels:
// LabelRequest response
response = `{"status":"success","data":[]}`
default:
response = `{"status":"success"}`
}
_, err := w.Write([]byte(response))
require.NoError(t, err)
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return nil, nil
})
goldfishManager := &mockGoldfishManager{}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: false,
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 1 * time.Hour,
}, log.NewNopLogger())
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, tt.request)
require.NoError(t, err)
// Create a response recorder to capture the response
recorder := httptest.NewRecorder()
// Call ServeHTTP with the HTTP request
handler.ServeHTTP(recorder, httpReq)
// Verify the response was successful
require.Equal(t, http.StatusOK, recorder.Code)
// Verify that the default handler was called (not logs/metrics handlers)
require.True(t, defaultHandlerCalled, "expected default handler to be called for unsupported request type")
require.False(t, fanOutHandlerCalled, "fan-out handler was not called for unsupported request type")
require.Equal(t, "test-tenant", capturedTenantID, "expected tenant ID to be passed to default handler")
})
}
}
func TestSplittingHandler_NilPreferredBackend_CallsFanoutHandler(t *testing.T) {
var capturedTenantID string
fanOutHandlerCalled := false
mockFanOutHandler := queryrangebase.HandlerFunc(
func(ctx context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
tenantID, err := user.ExtractOrgID(ctx)
if err == nil {
capturedTenantID = tenantID
}
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: nil,
V1Backend: nil, // nil preferred backend
SkipFanoutWhenNotSampling: false,
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 0,
}, log.NewNopLogger())
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: `{app="test"}`,
StartTs: time.Now().Add(-1 * time.Hour),
EndTs: time.Now(),
Step: 60000, // 1 minute in milliseconds
Limit: 100,
Path: constants.PathLokiQueryRange,
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, fanOutHandlerCalled, "expected fanout handler to be called when preferred backend is nil")
require.Equal(t, "test-tenant", capturedTenantID, "expected tenant ID to be passed to fanout handler")
}
// TestSplittingHandler_RoutingModeV1Preferred_SkipsToDefaultWhenNotSampling tests that
// v1-preferred mode skips fanout and goes directly to the default handler when not sampling.
func TestSplittingHandler_RoutingModeV1Preferred_SkipsToDefaultWhenNotSampling(t *testing.T) {
defaultHandlerCalled := false
fanOutHandlerCalled := false
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
defaultHandlerCalled = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
goldfishManager := &mockGoldfishManager{
shouldSampleResult: false, // NOT sampling
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: true, // Enable skip when not sampling
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 1 * time.Hour,
}, log.NewNopLogger())
require.NoError(t, err)
// Use a LokiRequest with a v2-engine-supported query
now := time.Now()
query := `sum(rate({app="test"}[5m]))`
expr, err := syntax.ParseExpr(query)
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: query,
StartTs: now.Add(-2 * time.Hour),
EndTs: now,
Step: 60000,
Limit: 100,
Path: constants.PathLokiQueryRange,
Plan: &plan.QueryPlan{
AST: expr,
},
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, defaultHandlerCalled, "v1-preferred mode should skip to default handler when not sampling")
require.False(t, fanOutHandlerCalled, "v1-preferred mode should NOT call fanout handler when not sampling")
}
// TestSplittingHandler_AlwaysSplitsEvenWhenNotSampling tests that
// v2-preferred and race modes always split queries when splitLag > 0, regardless of sampling.
func TestSplittingHandler_AlwaysSplitsEvenWhenNotSampling(t *testing.T) {
testCases := []struct {
name string
routingMode RoutingMode
}{
{
name: "v2-preferred mode",
routingMode: RoutingModeV2Preferred,
},
{
name: "race mode",
routingMode: RoutingModeRace,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defaultHandlerCalled := false
fanOutHandlerCalled := false
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
defaultHandlerCalled = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, false, false)
require.NoError(t, err)
goldfishManager := &mockGoldfishManager{
shouldSampleResult: false, // NOT sampling
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: true, // Enable skip when not sampling, which should not apply in these 2 modes
RoutingMode: tc.routingMode,
SplitStart: time.Time{},
SplitLag: time.Hour,
}, log.NewNopLogger())
require.NoError(t, err)
now := time.Now()
query := `sum(rate({app="test"}[5m]))`
expr, err := syntax.ParseExpr(query)
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: query,
StartTs: now.Add(-2 * time.Hour), // Start before the lag window
EndTs: now, // End at now (within lag window)
Step: 60000, // 1 minute step in milliseconds (required for metric queries)
Limit: 100,
Path: constants.PathLokiQueryRange,
Plan: &plan.QueryPlan{
AST: expr,
},
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, defaultHandlerCalled, "%s should call handler even when not sampling", tc.name)
require.True(t, fanOutHandlerCalled, "fanout handler should be called for post-lag data even when not sampling")
})
}
}
// TestSplittingHandler_NoSplitLag_UsesFanoutHandler tests that when splitLag is 0
// the handler uses the fanout handler directly.
func TestSplittingHandler_NoSplitLag_UsesFanoutHandler(t *testing.T) {
defaultHandlerCalled := false
fanOutHandlerCalled := false
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
defaultHandlerCalled = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
goldfishManager := &mockGoldfishManager{
shouldSampleResult: true, // sampling enabled
correlationIDResult: "test-correlation-id",
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: false,
RoutingMode: RoutingModeRace,
SplitStart: time.Time{},
SplitLag: 0, // No split lag - should use fanout directly
}, log.NewNopLogger())
require.NoError(t, err)
lokiReq := &queryrange.LokiInstantRequest{
Query: `{app="test"}`,
TimeTs: time.Now(),
Limit: 100,
Path: constants.PathLokiQuery,
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, fanOutHandlerCalled, "when splitLag is 0, should use fanout handler directly")
require.False(t, defaultHandlerCalled, "when splitLag is 0, should NOT use default handler")
}
// TestSplittingHandler_V1Preferred_SplitsWhenSampling tests that v1-preferred mode
// does split queries when sampling is enabled.
func TestSplittingHandler_V1Preferred_SplitsWhenSampling(t *testing.T) {
defaultHandlerCalled := false
fanOutHandlerCalled := false
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
defaultHandlerCalled = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
goldfishManager := &mockGoldfishManager{
shouldSampleResult: true, // IS sampling
correlationIDResult: "test-correlation-id",
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: true,
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 1 * time.Hour,
}, log.NewNopLogger())
require.NoError(t, err)
// Use a LokiRequest with a v2-engine-supported query so it goes through the splitting logic
// and calls both the default handler (for pre-lag data) and fanout handler (for post-lag data)
now := time.Now()
query := `sum(rate({app="test"}[5m]))`
expr, err := syntax.ParseExpr(query)
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: query,
StartTs: now.Add(-2 * time.Hour),
EndTs: now,
Step: 60000,
Limit: 100,
Path: constants.PathLokiQueryRange,
Plan: &plan.QueryPlan{
AST: expr,
},
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, defaultHandlerCalled, "v1-preferred mode should split when sampling is enabled")
require.True(t, fanOutHandlerCalled, "fanout handler should be called for post-lag data when sampling")
}
// TestSplittingHandler_SkipFanoutDisabled_AlwaysSplits tests that when
// SkipFanoutWhenNotSampling is false, all modes split regardless of sampling.
func TestSplittingHandler_SkipFanoutDisabled_AlwaysSplits(t *testing.T) {
for _, routingMode := range []RoutingMode{RoutingModeV1Preferred, RoutingModeV2Preferred, RoutingModeRace} {
t.Run(string(routingMode), func(t *testing.T) {
defaultHandlerCalled := false
fanOutHandlerCalled := false
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
defaultHandlerCalled = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
goldfishManager := &mockGoldfishManager{
shouldSampleResult: false, // NOT sampling
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: preferredBackend,
SkipFanoutWhenNotSampling: false, // Disabled - should always split
RoutingMode: routingMode,
SplitStart: time.Time{},
SplitLag: 1 * time.Hour,
}, log.NewNopLogger())
require.NoError(t, err)
// Use a LokiRequest with a v2-engine-supported query
now := time.Now()
query := `sum(rate({app="test"}[5m]))`
expr, err := syntax.ParseExpr(query)
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: query,
StartTs: now.Add(-2 * time.Hour),
EndTs: now,
Step: 60000,
Limit: 100,
Path: constants.PathLokiQueryRange,
Plan: &plan.QueryPlan{
AST: expr,
},
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, defaultHandlerCalled)
require.True(t, fanOutHandlerCalled)
})
}
}
// TestSplittingHandler_ReadsPath_GoldfishCorrelationIDHeader tests that the goldfish correlation ID
// header is set (or absent) in responses for read requests depending on the sampling decision.
func TestSplittingHandler_ReadsPath_GoldfishCorrelationIDHeader(t *testing.T) {
tests := []struct {
name string
shouldSample bool
correlationID string
expectHeader bool
expectedHeader string
}{
{
name: "sampled read sets goldfish header",
shouldSample: true,
correlationID: "test-uuid",
expectHeader: true,
expectedHeader: "test-uuid",
},
{
name: "non-sampled read omits goldfish header",
shouldSample: false,
correlationID: "",
expectHeader: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// The header is set by SplittingHandler.writeResponse, which is only reached
// when V1Backend is non-nil (otherwise NewSplittingHandler returns early with
// a plain fanout wrapper). Use a real V1Backend server to reach that code path.
v1Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer v1Server.Close()
v1BackendURL, err := url.Parse(v1Server.URL)
require.NoError(t, err)
v1Backend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
require.NoError(t, err)
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
goldfishManager := &mockGoldfishManager{
shouldSampleResult: tc.shouldSample,
correlationIDResult: tc.correlationID,
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: v1Backend,
SkipFanoutWhenNotSampling: false,
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 0, // no split: goes directly to fanOutHandler
}, log.NewNopLogger())
require.NoError(t, err)
lokiReq := &queryrange.LokiInstantRequest{
Query: `{app="test"}`,
TimeTs: time.Now(),
Limit: 100,
Path: constants.PathLokiQuery,
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
if tc.expectHeader {
require.Equal(t, tc.expectedHeader, recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
"expected goldfish header to be set")
} else {
require.Empty(t, recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
"expected goldfish header to be absent")
}
})
}
}
// TestSplittingHandler_ReadsPath_GoldfishHeaderOnErrorResponse tests that the goldfish correlation ID
// header survives error responses on the reads path. When the fanout handler returns an error,
// writeResponse should still include the header so clients can correlate the request.
func TestSplittingHandler_ReadsPath_GoldfishHeaderOnErrorResponse(t *testing.T) {
// The fanout handler returns an error to simulate a backend failure
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
return nil, errors.New("simulated backend failure")
})
v1Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer v1Server.Close()
v1BackendURL, err := url.Parse(v1Server.URL)
require.NoError(t, err)
v1Backend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
require.NoError(t, err)
goldfishManager := &mockGoldfishManager{
shouldSampleResult: true,
correlationIDResult: "error-test-uuid",
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: v1Backend,
SkipFanoutWhenNotSampling: false,
RoutingMode: RoutingModeV1Preferred,
SplitStart: time.Time{},
SplitLag: 0, // no split: goes directly to fanOutHandler
}, log.NewNopLogger())
require.NoError(t, err)
lokiReq := &queryrange.LokiInstantRequest{
Query: `{app="test"}`,
TimeTs: time.Now(),
Limit: 100,
Path: constants.PathLokiQuery,
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
// The response should be an error (500)
require.Equal(t, http.StatusInternalServerError, recorder.Code, "handler error should result in 500 status")
// The goldfish header should still be present despite the error
require.Equal(t, "error-test-uuid", recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
"goldfish header must survive error responses so clients can correlate sampled requests")
}
// TestSplittingHandler_CorrelationIDConsistency tests that the correlation ID in the response header
// matches the ID passed to SendToGoldfish, ensuring end-to-end consistency.
func TestSplittingHandler_CorrelationIDConsistency(t *testing.T) {
// Two backends: v1 (preferred) and v2 (non-preferred) — both succeed
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"app":"test"},"values":[["1000000000","line"]]}]}}`))
}))
defer backend.Close()
backendURL, err := url.Parse(backend.URL)
require.NoError(t, err)
v1Backend, err := NewProxyBackend("v1", backendURL, 5*time.Second, true, false)
require.NoError(t, err)
v2Backend, err := NewProxyBackend("v2", backendURL, 5*time.Second, false, true)
require.NoError(t, err)
backends := []*ProxyBackend{v1Backend, v2Backend}
goldfishManager := &mockGoldfishManager{
shouldSampleResult: true,
correlationIDResult: "consistency-test-uuid",
}
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
GoldfishManager: goldfishManager,
Logger: logger,
Metrics: metrics,
RoutingMode: RoutingModeV1Preferred,
SplitLag: 0, // no split lag: fanout directly
})
endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false)
queryHandler, err := handlerFactory.CreateHandler("test", nil)
require.NoError(t, err)
endpoint.WithQueryHandler(queryHandler)
endpoint.WithGoldfish(goldfishManager)
lokiReq := &queryrange.LokiInstantRequest{
Query: `{app="test"}`,
TimeTs: time.Now(),
Limit: 100,
Path: constants.PathLokiQuery,
}
ctx := user.InjectOrgID(context.Background(), "test-tenant")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
endpoint.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
// Wait for async goldfish processing to complete by polling the send state
require.Eventually(t, func() bool {
sendCalled, _ := goldfishManager.getSendState()
return sendCalled
}, 5*time.Second, 10*time.Millisecond,
"SendToGoldfish should have been called for a 2-backend sampled request")
headerID := recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader)
require.Equal(t, "consistency-test-uuid", headerID,
"header should contain the correlation ID")
// Verify that SendToGoldfish was called with the same ID that appears in the header
sendCalled, sendCorrelationID := goldfishManager.getSendState()
require.True(t, sendCalled,
"SendToGoldfish should have been called for a 2-backend sampled request")
require.Equal(t, headerID, sendCorrelationID,
"correlation ID in response header must match the ID passed to SendToGoldfish")
}
// TestSplittingHandler_MultiTenantQuery_RoutesToV1Only tests that multi-tenant queries
// (X-Scope-OrgID: tenant1|tenant2) are routed exclusively to v1, regardless of routing mode.
// This is because v2 does not support multi-tenant queries.
func TestSplittingHandler_MultiTenantQuery_RoutesToV1Only(t *testing.T) {
for _, routingMode := range []RoutingMode{RoutingModeV1Preferred, RoutingModeV2Preferred, RoutingModeRace} {
t.Run(string(routingMode), func(t *testing.T) {
v1BackendCalled := false
fanOutHandlerCalled := false
var capturedTenantID string
// Mock v1 backend that tracks calls
v1Backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
v1BackendCalled = true
capturedTenantID = r.Header.Get("X-Scope-OrgID")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
}))
defer v1Backend.Close()
v1BackendURL, err := url.Parse(v1Backend.URL)
require.NoError(t, err)
v1ProxyBackend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
require.NoError(t, err)
// Mock fanout handler that should NOT be called for multi-tenant queries
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
fanOutHandlerCalled = true
return &queryrange.LokiResponse{
Status: "success",
Data: queryrange.LokiData{ResultType: "streams"},
}, nil
})
goldfishManager := &mockGoldfishManager{
shouldSampleResult: true, // Enable sampling to ensure we're not skipping due to that
correlationIDResult: "test-correlation-id",
}
handler, err := NewSplittingHandler(SplittingHandlerConfig{
Codec: queryrange.DefaultCodec,
FanOutHandler: mockFanOutHandler,
GoldfishManager: goldfishManager,
V1Backend: v1ProxyBackend,
SkipFanoutWhenNotSampling: false,
RoutingMode: routingMode,
SplitStart: time.Time{},
SplitLag: 1 * time.Hour, // Enable splitting
}, log.NewNopLogger())
require.NoError(t, err)
// Use a LokiRequest that would normally go through splitting/fanout
now := time.Now()
query := `sum(rate({app="test"}[5m]))`
expr, err := syntax.ParseExpr(query)
require.NoError(t, err)
lokiReq := &queryrange.LokiRequest{
Query: query,
StartTs: now.Add(-2 * time.Hour),
EndTs: now,
Step: 60000,
Limit: 100,
Path: constants.PathLokiQueryRange,
Plan: &plan.QueryPlan{
AST: expr,
},
}
// Inject multi-tenant org ID (pipe-separated)
ctx := user.InjectOrgID(context.Background(), "tenant1|tenant2")
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httpReq)
require.Equal(t, http.StatusOK, recorder.Code)
require.True(t, v1BackendCalled, "multi-tenant query should be routed to v1 backend in %s mode", routingMode)
require.False(t, fanOutHandlerCalled, "multi-tenant query should NOT use fanout handler in %s mode (would route to v2)", routingMode)
require.Equal(t, "tenant1|tenant2", capturedTenantID, "tenant ID should be preserved when routing to v1")
})
}
}