mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
921 lines
32 KiB
921 lines
32 KiB
package querytee
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/grafana/dskit/user"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
"github.com/grafana/loki/v3/pkg/querier/plan"
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange"
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
|
|
"github.com/grafana/loki/v3/pkg/querytee/goldfish"
|
|
"github.com/grafana/loki/v3/pkg/util/constants"
|
|
)
|
|
|
|
// mockGoldfishManager is a mock implementation of goldfish.Manager for testing.
|
|
// Fields used from async goroutines (sendCalled, sendCorrelationID) are protected by a mutex.
|
|
type mockGoldfishManager struct {
|
|
shouldSampleResult bool
|
|
correlationIDResult string
|
|
|
|
mu sync.Mutex
|
|
sendCalled bool
|
|
sendCorrelationID string
|
|
}
|
|
|
|
func (m *mockGoldfishManager) ShouldSample(_ string) (bool, string) {
|
|
return m.shouldSampleResult, m.correlationIDResult
|
|
}
|
|
|
|
func (m *mockGoldfishManager) SendToGoldfish(_ *http.Request, _, _ *goldfish.BackendResponse, correlationID string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.sendCalled = true
|
|
m.sendCorrelationID = correlationID
|
|
}
|
|
|
|
func (m *mockGoldfishManager) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// getSendState returns the thread-safe state of SendToGoldfish calls.
|
|
func (m *mockGoldfishManager) getSendState() (bool, string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.sendCalled, m.sendCorrelationID
|
|
}
|
|
|
|
func TestSplittingHandler_ServeSplits_UnsupportedRequestUsesDefaultHandler(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
request queryrangebase.Request
|
|
}{
|
|
{
|
|
name: "LokiInstantRequest",
|
|
request: &queryrange.LokiInstantRequest{
|
|
Query: `{app="test"}`,
|
|
TimeTs: time.Now(),
|
|
Limit: 100,
|
|
Path: constants.PathLokiQuery,
|
|
Shards: nil,
|
|
},
|
|
},
|
|
{
|
|
name: "LokiSeriesRequest",
|
|
request: &queryrange.LokiSeriesRequest{
|
|
Match: []string{`{app="test"}`},
|
|
StartTs: time.Now().Add(-1 * time.Hour),
|
|
EndTs: time.Now(),
|
|
Path: constants.PathLokiSeries,
|
|
Shards: nil,
|
|
},
|
|
},
|
|
{
|
|
name: "LabelRequest",
|
|
request: queryrange.NewLabelRequest(
|
|
time.Now().Add(-1*time.Hour),
|
|
time.Now(),
|
|
"",
|
|
"app",
|
|
constants.PathLokiLabel+"/app/values",
|
|
),
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
var capturedTenantID string
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
defaultHandlerCalled = true
|
|
capturedTenantID = r.Header.Get(user.OrgIDHeaderName)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
var response string
|
|
switch r.URL.Path {
|
|
case constants.PathLokiQuery:
|
|
// LokiInstantRequest response
|
|
response = `{"status":"success","data":{"resultType":"streams","result":[]}}`
|
|
case constants.PathLokiSeries:
|
|
// LokiSeriesRequest response
|
|
response = `{"status":"success","data":[]}`
|
|
case constants.PathLokiLabel + "/app/values", constants.PathLokiLabels:
|
|
// LabelRequest response
|
|
response = `{"status":"success","data":[]}`
|
|
default:
|
|
response = `{"status":"success"}`
|
|
}
|
|
|
|
_, err := w.Write([]byte(response))
|
|
require.NoError(t, err)
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return nil, nil
|
|
})
|
|
|
|
goldfishManager := &mockGoldfishManager{}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 1 * time.Hour,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, tt.request)
|
|
require.NoError(t, err)
|
|
|
|
// Create a response recorder to capture the response
|
|
recorder := httptest.NewRecorder()
|
|
|
|
// Call ServeHTTP with the HTTP request
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
// Verify the response was successful
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
|
|
// Verify that the default handler was called (not logs/metrics handlers)
|
|
require.True(t, defaultHandlerCalled, "expected default handler to be called for unsupported request type")
|
|
require.False(t, fanOutHandlerCalled, "fan-out handler was not called for unsupported request type")
|
|
require.Equal(t, "test-tenant", capturedTenantID, "expected tenant ID to be passed to default handler")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSplittingHandler_NilPreferredBackend_CallsFanoutHandler(t *testing.T) {
|
|
var capturedTenantID string
|
|
fanOutHandlerCalled := false
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(
|
|
func(ctx context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
tenantID, err := user.ExtractOrgID(ctx)
|
|
if err == nil {
|
|
capturedTenantID = tenantID
|
|
}
|
|
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: nil,
|
|
V1Backend: nil, // nil preferred backend
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 0,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: `{app="test"}`,
|
|
StartTs: time.Now().Add(-1 * time.Hour),
|
|
EndTs: time.Now(),
|
|
Step: 60000, // 1 minute in milliseconds
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, fanOutHandlerCalled, "expected fanout handler to be called when preferred backend is nil")
|
|
require.Equal(t, "test-tenant", capturedTenantID, "expected tenant ID to be passed to fanout handler")
|
|
}
|
|
|
|
// TestSplittingHandler_RoutingModeV1Preferred_SkipsToDefaultWhenNotSampling tests that
|
|
// v1-preferred mode skips fanout and goes directly to the default handler when not sampling.
|
|
func TestSplittingHandler_RoutingModeV1Preferred_SkipsToDefaultWhenNotSampling(t *testing.T) {
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
defaultHandlerCalled = true
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: false, // NOT sampling
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: true, // Enable skip when not sampling
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 1 * time.Hour,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
// Use a LokiRequest with a v2-engine-supported query
|
|
now := time.Now()
|
|
query := `sum(rate({app="test"}[5m]))`
|
|
expr, err := syntax.ParseExpr(query)
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: query,
|
|
StartTs: now.Add(-2 * time.Hour),
|
|
EndTs: now,
|
|
Step: 60000,
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
Plan: &plan.QueryPlan{
|
|
AST: expr,
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, defaultHandlerCalled, "v1-preferred mode should skip to default handler when not sampling")
|
|
require.False(t, fanOutHandlerCalled, "v1-preferred mode should NOT call fanout handler when not sampling")
|
|
}
|
|
|
|
// TestSplittingHandler_AlwaysSplitsEvenWhenNotSampling tests that
|
|
// v2-preferred and race modes always split queries when splitLag > 0, regardless of sampling.
|
|
func TestSplittingHandler_AlwaysSplitsEvenWhenNotSampling(t *testing.T) {
|
|
testCases := []struct {
|
|
name string
|
|
routingMode RoutingMode
|
|
}{
|
|
{
|
|
name: "v2-preferred mode",
|
|
routingMode: RoutingModeV2Preferred,
|
|
},
|
|
{
|
|
name: "race mode",
|
|
routingMode: RoutingModeRace,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
defaultHandlerCalled = true
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, false, false)
|
|
require.NoError(t, err)
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: false, // NOT sampling
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: true, // Enable skip when not sampling, which should not apply in these 2 modes
|
|
RoutingMode: tc.routingMode,
|
|
SplitStart: time.Time{},
|
|
SplitLag: time.Hour,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
now := time.Now()
|
|
query := `sum(rate({app="test"}[5m]))`
|
|
expr, err := syntax.ParseExpr(query)
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: query,
|
|
StartTs: now.Add(-2 * time.Hour), // Start before the lag window
|
|
EndTs: now, // End at now (within lag window)
|
|
Step: 60000, // 1 minute step in milliseconds (required for metric queries)
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
Plan: &plan.QueryPlan{
|
|
AST: expr,
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, defaultHandlerCalled, "%s should call handler even when not sampling", tc.name)
|
|
require.True(t, fanOutHandlerCalled, "fanout handler should be called for post-lag data even when not sampling")
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSplittingHandler_NoSplitLag_UsesFanoutHandler tests that when splitLag is 0
|
|
// the handler uses the fanout handler directly.
|
|
func TestSplittingHandler_NoSplitLag_UsesFanoutHandler(t *testing.T) {
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
defaultHandlerCalled = true
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: true, // sampling enabled
|
|
correlationIDResult: "test-correlation-id",
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: RoutingModeRace,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 0, // No split lag - should use fanout directly
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiInstantRequest{
|
|
Query: `{app="test"}`,
|
|
TimeTs: time.Now(),
|
|
Limit: 100,
|
|
Path: constants.PathLokiQuery,
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, fanOutHandlerCalled, "when splitLag is 0, should use fanout handler directly")
|
|
require.False(t, defaultHandlerCalled, "when splitLag is 0, should NOT use default handler")
|
|
}
|
|
|
|
// TestSplittingHandler_V1Preferred_SplitsWhenSampling tests that v1-preferred mode
|
|
// does split queries when sampling is enabled.
|
|
func TestSplittingHandler_V1Preferred_SplitsWhenSampling(t *testing.T) {
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
defaultHandlerCalled = true
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: true, // IS sampling
|
|
correlationIDResult: "test-correlation-id",
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: true,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 1 * time.Hour,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
// Use a LokiRequest with a v2-engine-supported query so it goes through the splitting logic
|
|
// and calls both the default handler (for pre-lag data) and fanout handler (for post-lag data)
|
|
now := time.Now()
|
|
query := `sum(rate({app="test"}[5m]))`
|
|
expr, err := syntax.ParseExpr(query)
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: query,
|
|
StartTs: now.Add(-2 * time.Hour),
|
|
EndTs: now,
|
|
Step: 60000,
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
Plan: &plan.QueryPlan{
|
|
AST: expr,
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, defaultHandlerCalled, "v1-preferred mode should split when sampling is enabled")
|
|
require.True(t, fanOutHandlerCalled, "fanout handler should be called for post-lag data when sampling")
|
|
}
|
|
|
|
// TestSplittingHandler_SkipFanoutDisabled_AlwaysSplits tests that when
|
|
// SkipFanoutWhenNotSampling is false, all modes split regardless of sampling.
|
|
func TestSplittingHandler_SkipFanoutDisabled_AlwaysSplits(t *testing.T) {
|
|
for _, routingMode := range []RoutingMode{RoutingModeV1Preferred, RoutingModeV2Preferred, RoutingModeRace} {
|
|
t.Run(string(routingMode), func(t *testing.T) {
|
|
defaultHandlerCalled := false
|
|
fanOutHandlerCalled := false
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
defaultHandlerCalled = true
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
preferredBackend, err := NewProxyBackend("preferred", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: false, // NOT sampling
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: preferredBackend,
|
|
SkipFanoutWhenNotSampling: false, // Disabled - should always split
|
|
RoutingMode: routingMode,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 1 * time.Hour,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
// Use a LokiRequest with a v2-engine-supported query
|
|
now := time.Now()
|
|
query := `sum(rate({app="test"}[5m]))`
|
|
expr, err := syntax.ParseExpr(query)
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: query,
|
|
StartTs: now.Add(-2 * time.Hour),
|
|
EndTs: now,
|
|
Step: 60000,
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
Plan: &plan.QueryPlan{
|
|
AST: expr,
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, defaultHandlerCalled)
|
|
require.True(t, fanOutHandlerCalled)
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSplittingHandler_ReadsPath_GoldfishCorrelationIDHeader tests that the goldfish correlation ID
|
|
// header is set (or absent) in responses for read requests depending on the sampling decision.
|
|
func TestSplittingHandler_ReadsPath_GoldfishCorrelationIDHeader(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
shouldSample bool
|
|
correlationID string
|
|
expectHeader bool
|
|
expectedHeader string
|
|
}{
|
|
{
|
|
name: "sampled read sets goldfish header",
|
|
shouldSample: true,
|
|
correlationID: "test-uuid",
|
|
expectHeader: true,
|
|
expectedHeader: "test-uuid",
|
|
},
|
|
{
|
|
name: "non-sampled read omits goldfish header",
|
|
shouldSample: false,
|
|
correlationID: "",
|
|
expectHeader: false,
|
|
},
|
|
}
|
|
|
|
for _, tc := range tests {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
// The header is set by SplittingHandler.writeResponse, which is only reached
|
|
// when V1Backend is non-nil (otherwise NewSplittingHandler returns early with
|
|
// a plain fanout wrapper). Use a real V1Backend server to reach that code path.
|
|
v1Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer v1Server.Close()
|
|
|
|
v1BackendURL, err := url.Parse(v1Server.URL)
|
|
require.NoError(t, err)
|
|
v1Backend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: tc.shouldSample,
|
|
correlationIDResult: tc.correlationID,
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: v1Backend,
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 0, // no split: goes directly to fanOutHandler
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiInstantRequest{
|
|
Query: `{app="test"}`,
|
|
TimeTs: time.Now(),
|
|
Limit: 100,
|
|
Path: constants.PathLokiQuery,
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
|
|
if tc.expectHeader {
|
|
require.Equal(t, tc.expectedHeader, recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
|
|
"expected goldfish header to be set")
|
|
} else {
|
|
require.Empty(t, recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
|
|
"expected goldfish header to be absent")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestSplittingHandler_ReadsPath_GoldfishHeaderOnErrorResponse tests that the goldfish correlation ID
|
|
// header survives error responses on the reads path. When the fanout handler returns an error,
|
|
// writeResponse should still include the header so clients can correlate the request.
|
|
func TestSplittingHandler_ReadsPath_GoldfishHeaderOnErrorResponse(t *testing.T) {
|
|
// The fanout handler returns an error to simulate a backend failure
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
return nil, errors.New("simulated backend failure")
|
|
})
|
|
|
|
v1Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer v1Server.Close()
|
|
|
|
v1BackendURL, err := url.Parse(v1Server.URL)
|
|
require.NoError(t, err)
|
|
v1Backend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: true,
|
|
correlationIDResult: "error-test-uuid",
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: v1Backend,
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 0, // no split: goes directly to fanOutHandler
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiInstantRequest{
|
|
Query: `{app="test"}`,
|
|
TimeTs: time.Now(),
|
|
Limit: 100,
|
|
Path: constants.PathLokiQuery,
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
// The response should be an error (500)
|
|
require.Equal(t, http.StatusInternalServerError, recorder.Code, "handler error should result in 500 status")
|
|
|
|
// The goldfish header should still be present despite the error
|
|
require.Equal(t, "error-test-uuid", recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader),
|
|
"goldfish header must survive error responses so clients can correlate sampled requests")
|
|
}
|
|
|
|
// TestSplittingHandler_CorrelationIDConsistency tests that the correlation ID in the response header
|
|
// matches the ID passed to SendToGoldfish, ensuring end-to-end consistency.
|
|
func TestSplittingHandler_CorrelationIDConsistency(t *testing.T) {
|
|
// Two backends: v1 (preferred) and v2 (non-preferred) — both succeed
|
|
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[{"stream":{"app":"test"},"values":[["1000000000","line"]]}]}}`))
|
|
}))
|
|
defer backend.Close()
|
|
|
|
backendURL, err := url.Parse(backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
v1Backend, err := NewProxyBackend("v1", backendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
v2Backend, err := NewProxyBackend("v2", backendURL, 5*time.Second, false, true)
|
|
require.NoError(t, err)
|
|
backends := []*ProxyBackend{v1Backend, v2Backend}
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: true,
|
|
correlationIDResult: "consistency-test-uuid",
|
|
}
|
|
|
|
metrics := NewProxyMetrics(nil)
|
|
logger := log.NewNopLogger()
|
|
|
|
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
|
|
Backends: backends,
|
|
Codec: queryrange.DefaultCodec,
|
|
GoldfishManager: goldfishManager,
|
|
Logger: logger,
|
|
Metrics: metrics,
|
|
RoutingMode: RoutingModeV1Preferred,
|
|
SplitLag: 0, // no split lag: fanout directly
|
|
})
|
|
|
|
endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false)
|
|
queryHandler, err := handlerFactory.CreateHandler("test", nil)
|
|
require.NoError(t, err)
|
|
endpoint.WithQueryHandler(queryHandler)
|
|
endpoint.WithGoldfish(goldfishManager)
|
|
|
|
lokiReq := &queryrange.LokiInstantRequest{
|
|
Query: `{app="test"}`,
|
|
TimeTs: time.Now(),
|
|
Limit: 100,
|
|
Path: constants.PathLokiQuery,
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "test-tenant")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
endpoint.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
|
|
// Wait for async goldfish processing to complete by polling the send state
|
|
require.Eventually(t, func() bool {
|
|
sendCalled, _ := goldfishManager.getSendState()
|
|
return sendCalled
|
|
}, 5*time.Second, 10*time.Millisecond,
|
|
"SendToGoldfish should have been called for a 2-backend sampled request")
|
|
|
|
headerID := recorder.Header().Get(goldfish.GoldfishCorrelationIDHeader)
|
|
require.Equal(t, "consistency-test-uuid", headerID,
|
|
"header should contain the correlation ID")
|
|
|
|
// Verify that SendToGoldfish was called with the same ID that appears in the header
|
|
sendCalled, sendCorrelationID := goldfishManager.getSendState()
|
|
require.True(t, sendCalled,
|
|
"SendToGoldfish should have been called for a 2-backend sampled request")
|
|
require.Equal(t, headerID, sendCorrelationID,
|
|
"correlation ID in response header must match the ID passed to SendToGoldfish")
|
|
}
|
|
|
|
// TestSplittingHandler_MultiTenantQuery_RoutesToV1Only tests that multi-tenant queries
|
|
// (X-Scope-OrgID: tenant1|tenant2) are routed exclusively to v1, regardless of routing mode.
|
|
// This is because v2 does not support multi-tenant queries.
|
|
func TestSplittingHandler_MultiTenantQuery_RoutesToV1Only(t *testing.T) {
|
|
for _, routingMode := range []RoutingMode{RoutingModeV1Preferred, RoutingModeV2Preferred, RoutingModeRace} {
|
|
t.Run(string(routingMode), func(t *testing.T) {
|
|
v1BackendCalled := false
|
|
fanOutHandlerCalled := false
|
|
var capturedTenantID string
|
|
|
|
// Mock v1 backend that tracks calls
|
|
v1Backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
v1BackendCalled = true
|
|
capturedTenantID = r.Header.Get("X-Scope-OrgID")
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"streams","result":[]}}`))
|
|
}))
|
|
defer v1Backend.Close()
|
|
|
|
v1BackendURL, err := url.Parse(v1Backend.URL)
|
|
require.NoError(t, err)
|
|
|
|
v1ProxyBackend, err := NewProxyBackend("v1", v1BackendURL, 5*time.Second, true, false)
|
|
require.NoError(t, err)
|
|
|
|
// Mock fanout handler that should NOT be called for multi-tenant queries
|
|
mockFanOutHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
fanOutHandlerCalled = true
|
|
return &queryrange.LokiResponse{
|
|
Status: "success",
|
|
Data: queryrange.LokiData{ResultType: "streams"},
|
|
}, nil
|
|
})
|
|
|
|
goldfishManager := &mockGoldfishManager{
|
|
shouldSampleResult: true, // Enable sampling to ensure we're not skipping due to that
|
|
correlationIDResult: "test-correlation-id",
|
|
}
|
|
|
|
handler, err := NewSplittingHandler(SplittingHandlerConfig{
|
|
Codec: queryrange.DefaultCodec,
|
|
FanOutHandler: mockFanOutHandler,
|
|
GoldfishManager: goldfishManager,
|
|
V1Backend: v1ProxyBackend,
|
|
SkipFanoutWhenNotSampling: false,
|
|
RoutingMode: routingMode,
|
|
SplitStart: time.Time{},
|
|
SplitLag: 1 * time.Hour, // Enable splitting
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
|
|
// Use a LokiRequest that would normally go through splitting/fanout
|
|
now := time.Now()
|
|
query := `sum(rate({app="test"}[5m]))`
|
|
expr, err := syntax.ParseExpr(query)
|
|
require.NoError(t, err)
|
|
|
|
lokiReq := &queryrange.LokiRequest{
|
|
Query: query,
|
|
StartTs: now.Add(-2 * time.Hour),
|
|
EndTs: now,
|
|
Step: 60000,
|
|
Limit: 100,
|
|
Path: constants.PathLokiQueryRange,
|
|
Plan: &plan.QueryPlan{
|
|
AST: expr,
|
|
},
|
|
}
|
|
|
|
// Inject multi-tenant org ID (pipe-separated)
|
|
ctx := user.InjectOrgID(context.Background(), "tenant1|tenant2")
|
|
httpReq, err := queryrange.DefaultCodec.EncodeRequest(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
|
|
recorder := httptest.NewRecorder()
|
|
handler.ServeHTTP(recorder, httpReq)
|
|
|
|
require.Equal(t, http.StatusOK, recorder.Code)
|
|
require.True(t, v1BackendCalled, "multi-tenant query should be routed to v1 backend in %s mode", routingMode)
|
|
require.False(t, fanOutHandlerCalled, "multi-tenant query should NOT use fanout handler in %s mode (would route to v2)", routingMode)
|
|
require.Equal(t, "tenant1|tenant2", capturedTenantID, "tenant ID should be preserved when routing to v1")
|
|
})
|
|
}
|
|
}
|
|
|