Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/tools/querytee/proxy_endpoint_test.go

1044 lines
36 KiB

package querytee
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/tools/querytee/comparator"
querytee_goldfish "github.com/grafana/loki/v3/tools/querytee/goldfish"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
)
// createTestEndpoint creates a ProxyEndpoint with properly initialized handler pipeline
func createTestEndpoint(t *testing.T, backends []*ProxyBackend, routeName string, comparator comparator.ResponsesComparator, instrumentCompares bool) *ProxyEndpoint {
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comparator, instrumentCompares)
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
GoldfishManager: nil,
Logger: logger,
Metrics: metrics,
})
queryHandler, err := handlerFactory.CreateHandler(routeName, comparator)
require.NoError(t, err)
endpoint.WithQueryHandler(queryHandler)
return endpoint
}
// createTestEndpointWithMetrics creates a ProxyEndpoint with custom metrics
func createTestEndpointWithMetrics(t *testing.T, backends []*ProxyBackend, routeName string, comp comparator.ResponsesComparator, instrumentCompares bool, metrics *ProxyMetrics) *ProxyEndpoint {
logger := log.NewNopLogger()
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comp, instrumentCompares)
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
GoldfishManager: nil,
Logger: logger,
Metrics: metrics,
InstrumentCompares: instrumentCompares,
})
queryHandler, err := handlerFactory.CreateHandler(routeName, comp)
require.NoError(t, err)
endpoint.WithQueryHandler(queryHandler)
return endpoint
}
// createTestEndpointWithGoldfish creates a ProxyEndpoint with goldfish manager
func createTestEndpointWithGoldfish(t *testing.T, backends []*ProxyBackend, routeName string, goldfishManager querytee_goldfish.Manager) *ProxyEndpoint {
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
GoldfishManager: goldfishManager,
Logger: logger,
Metrics: metrics,
})
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, nil, false)
queryHandler, err := handlerFactory.CreateHandler(routeName, nil)
require.NoError(t, err)
endpoint.WithQueryHandler(queryHandler)
endpoint.WithGoldfish(goldfishManager)
return endpoint
}
func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {
backendURL1, err := url.Parse("http://backend-1/")
require.NoError(t, err)
backendURL2, err := url.Parse("http://backend-2/")
require.NoError(t, err)
backendURL3, err := url.Parse("http://backend-3/")
require.NoError(t, err)
backendPref := NewProxyBackend("backend-1", backendURL1, time.Second, true)
backendOther1 := NewProxyBackend("backend-2", backendURL2, time.Second, false)
backendOther2 := NewProxyBackend("backend-3", backendURL3, time.Second, false)
tests := map[string]struct {
backends []*ProxyBackend
responses []*BackendResponse
expected *ProxyBackend
}{
"the preferred backend is the 1st response received": {
backends: []*ProxyBackend{backendPref, backendOther1},
responses: []*BackendResponse{
{backend: backendPref, status: 200},
},
expected: backendPref,
},
"the preferred backend is the last response received": {
backends: []*ProxyBackend{backendPref, backendOther1},
responses: []*BackendResponse{
{backend: backendOther1, status: 200},
{backend: backendPref, status: 200},
},
expected: backendPref,
},
"the preferred backend is the last response received but it's not successful": {
backends: []*ProxyBackend{backendPref, backendOther1},
responses: []*BackendResponse{
{backend: backendOther1, status: 200},
{backend: backendPref, status: 500},
},
expected: backendOther1,
},
"the preferred backend is the 2nd response received but only the last one is successful": {
backends: []*ProxyBackend{backendPref, backendOther1, backendOther2},
responses: []*BackendResponse{
{backend: backendOther1, status: 500},
{backend: backendPref, status: 500},
{backend: backendOther2, status: 200},
},
expected: backendOther2,
},
"there's no preferred backend configured and the 1st response is successful": {
backends: []*ProxyBackend{backendOther1, backendOther2},
responses: []*BackendResponse{
{backend: backendOther1, status: 200},
},
expected: backendOther1,
},
"there's no preferred backend configured and the last response is successful": {
backends: []*ProxyBackend{backendOther1, backendOther2},
responses: []*BackendResponse{
{backend: backendOther1, status: 500},
{backend: backendOther2, status: 200},
},
expected: backendOther2,
},
"no received response is successful": {
backends: []*ProxyBackend{backendPref, backendOther1},
responses: []*BackendResponse{
{backend: backendOther1, status: 500},
{backend: backendPref, status: 500},
},
expected: backendOther1,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false)
// Send the responses from a dedicated goroutine.
resCh := make(chan *BackendResponse)
go func() {
for _, res := range testData.responses {
resCh <- res
}
close(resCh)
}()
// Wait for the selected backend response.
actual := endpoint.waitBackendResponseForDownstream(resCh)
assert.Equal(t, testData.expected, actual.backend)
})
}
}
func Test_ProxyEndpoint_QueryRequests(t *testing.T) {
var (
requestCount atomic.Uint64
wg sync.WaitGroup
testHandler http.HandlerFunc
)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
defer requestCount.Add(1)
testHandler(w, r)
})
backend1 := httptest.NewServer(handler)
defer backend1.Close()
backendURL1, err := url.Parse(backend1.URL)
require.NoError(t, err)
backend2 := httptest.NewServer(handler)
defer backend2.Close()
backendURL2, err := url.Parse(backend2.URL)
require.NoError(t, err)
backends := []*ProxyBackend{
NewProxyBackend("backend-1", backendURL1, time.Second, true),
NewProxyBackend("backend-2", backendURL2, time.Second, false).WithFilter(regexp.MustCompile("/loki/api/v1/query_range")),
}
endpoint := createTestEndpoint(t, backends, "test", nil, false)
for _, tc := range []struct {
name string
request func(*testing.T) *http.Request
handler func(*testing.T) http.HandlerFunc
counts int
}{
{
name: "GET-request",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("GET", "http://test/loki/api/v1/query_range?query={job=\"test\"}&start=1&end=2", nil)
r.Header["test-X"] = []string{"test-X-value"}
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "test-X-value", r.Header.Get("test-X"))
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}
},
counts: 2,
},
{
name: "GET-filter-accept-encoding",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("GET", "http://test/loki/api/v1/query_range?query={job=\"test\"}&start=1&end=2", nil)
r.Header.Set("Accept-Encoding", "gzip")
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, 0, len(r.Header.Values("Accept-Encoding")))
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}
},
counts: 2,
},
{
name: "GET-filtered",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("GET", "http://test/loki/api/v1/query?query={job=\"test\"}", nil)
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(_ *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}
},
counts: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
// reset request count
requestCount.Store(0)
wg.Add(tc.counts)
if tc.handler == nil {
testHandler = func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}
} else {
testHandler = tc.handler(t)
}
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, tc.request(t))
require.Contains(t, w.Body.String(), `"status":"success"`)
require.Equal(t, 200, w.Code)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for backend requests to complete")
}
require.Equal(t, uint64(tc.counts), requestCount.Load())
})
}
}
func Test_ProxyEndpoint_WriteRequests(t *testing.T) {
var (
requestCount atomic.Uint64
wg sync.WaitGroup
testHandler http.HandlerFunc
)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
defer requestCount.Add(1)
testHandler(w, r)
})
backend1 := httptest.NewServer(handler)
defer backend1.Close()
backendURL1, err := url.Parse(backend1.URL)
require.NoError(t, err)
backend2 := httptest.NewServer(handler)
defer backend2.Close()
backendURL2, err := url.Parse(backend2.URL)
require.NoError(t, err)
backends := []*ProxyBackend{
NewProxyBackend("backend-1", backendURL1, time.Second, true),
NewProxyBackend("backend-2", backendURL2, time.Second, false).WithFilter(regexp.MustCompile("/loki/api/v1/push")),
}
// endpoint := createTestEndpoint(backends, "test", nil, false)
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()
endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false)
for _, tc := range []struct {
name string
request func(*testing.T) *http.Request
handler func(*testing.T) http.HandlerFunc
counts int
}{
{
name: "POST-request",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("POST", "http://test/loki/api/v1/push", strings.NewReader(`{"streams":[{"stream":{"job":"test"},"values":[["1","test"]]}]}`))
r.Header.Set("test-X", "test-X-value")
r.Header["Content-Type"] = []string{"application/json"}
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "test-X-value", r.Header.Get("test-X"))
w.WriteHeader(204)
}
},
counts: 2,
},
{
name: "POST-filter-accept-encoding",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("POST", "http://test/loki/api/v1/push", strings.NewReader(`{"streams":[{"stream":{"job":"test"},"values":[["1","test"]]}]}`))
r.Header["Content-Type"] = []string{"application/json"}
r.Header.Set("Accept-Encoding", "gzip")
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, 0, len(r.Header.Values("Accept-Encoding")))
w.WriteHeader(204)
}
},
counts: 2,
},
{
name: "POST-filtered",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("POST", "http://test/loki/api/prom/push", strings.NewReader(`{"streams":[{"stream":{"job":"test"},"values":[["1","test"]]}]}`))
r.Header["Content-Type"] = []string{"application/json"}
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
handler: func(_ *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(204)
}
},
counts: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
// reset request count
requestCount.Store(0)
wg.Add(tc.counts)
if tc.handler == nil {
testHandler = func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(204)
}
} else {
testHandler = tc.handler(t)
}
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, tc.request(t))
require.Equal(t, 204, w.Code)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for backend requests to complete")
}
require.Equal(t, uint64(tc.counts), requestCount.Load())
})
}
}
func Test_ProxyEndpoint_SummaryMetrics(t *testing.T) {
var (
requestCount atomic.Uint64
wg sync.WaitGroup
testHandler http.HandlerFunc
)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
defer requestCount.Add(1)
testHandler(w, r)
})
backend1 := httptest.NewServer(handler)
defer backend1.Close()
backendURL1, err := url.Parse(backend1.URL)
require.NoError(t, err)
backend2 := httptest.NewServer(handler)
defer backend2.Close()
backendURL2, err := url.Parse(backend2.URL)
require.NoError(t, err)
backends := []*ProxyBackend{
NewProxyBackend("backend-1", backendURL1, time.Second, true),
NewProxyBackend("backend-2", backendURL2, time.Second, false),
}
comparator := &mockComparator{}
proxyMetrics := NewProxyMetrics(prometheus.NewRegistry())
endpoint := createTestEndpointWithMetrics(t, backends, "test", comparator, true, proxyMetrics)
for _, tc := range []struct {
name string
request func(*testing.T) *http.Request
counts int
expectedMetrics string
}{
{
name: "missing-metrics-series",
request: func(t *testing.T) *http.Request {
r, err := http.NewRequest("GET", "http://test/loki/api/v1/query_range?query={job=\"test\"}&start=1&end=2", nil)
r.Header.Set("X-Scope-OrgID", "test-tenant")
require.NoError(t, err)
return r
},
counts: 2,
expectedMetrics: `
# HELP cortex_querytee_missing_metrics_series Number of missing metrics (series) in a vector response.
# TYPE cortex_querytee_missing_metrics_series histogram
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.005"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.01"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.025"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.05"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.1"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.25"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.5"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="0.75"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="1"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="1.5"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="2"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="3"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="4"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="5"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="10"} 0
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="25"} 1
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="50"} 1
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="100"} 1
cortex_querytee_missing_metrics_series_bucket{backend="backend-2",issuer="unknown",route="test",status_code="success",le="+Inf"} 1
cortex_querytee_missing_metrics_series_sum{backend="backend-2",issuer="unknown",route="test",status_code="success"} 12
cortex_querytee_missing_metrics_series_count{backend="backend-2",issuer="unknown",route="test",status_code="success"} 1
`,
},
} {
t.Run(tc.name, func(t *testing.T) {
t.Skip("TODO this test is flaky, the eventually is inconsistent. Could we instead mock the metrics?")
// reset request count
requestCount.Store(0)
wg.Add(tc.counts)
testHandler = func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, tc.request(t))
require.Contains(t, w.Body.String(), `"status":"success"`)
require.Equal(t, 200, w.Code)
wg.Wait()
require.Equal(t, uint64(tc.counts), requestCount.Load())
require.Eventually(t, func() bool {
return prom_testutil.CollectAndCount(proxyMetrics.responsesComparedTotal) == 1
}, 3*time.Second, 100*time.Millisecond, "expect exactly 1 response to be compared.")
err := prom_testutil.CollectAndCompare(proxyMetrics.missingMetrics, strings.NewReader(tc.expectedMetrics))
require.NoError(t, err)
})
}
}
func Test_BackendResponse_succeeded(t *testing.T) {
tests := map[string]struct {
resStatus int
resError error
expected bool
}{
"Error while executing request": {
resStatus: 0,
resError: errors.New("network error"),
expected: false,
},
"2xx response status code": {
resStatus: 200,
resError: nil,
expected: true,
},
"3xx response status code": {
resStatus: 300,
resError: nil,
expected: false,
},
"4xx response status code": {
resStatus: 400,
resError: nil,
expected: true,
},
"5xx response status code": {
resStatus: 500,
resError: nil,
expected: false,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
res := &BackendResponse{
status: testData.resStatus,
err: testData.resError,
}
assert.Equal(t, testData.expected, res.succeeded())
})
}
}
func Test_BackendResponse_statusCode(t *testing.T) {
tests := map[string]struct {
resStatus int
resError error
expected int
}{
"Error while executing request": {
resStatus: 0,
resError: errors.New("network error"),
expected: 500,
},
"200 response status code": {
resStatus: 200,
resError: nil,
expected: 200,
},
"503 response status code": {
resStatus: 503,
resError: nil,
expected: 503,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
res := &BackendResponse{
status: testData.resStatus,
err: testData.resError,
}
assert.Equal(t, testData.expected, res.statusCode())
})
}
}
type mockComparator struct{}
func (c *mockComparator) Compare(_, _ []byte, _ time.Time) (*comparator.ComparisonSummary, error) {
return &comparator.ComparisonSummary{MissingMetrics: 12}, nil
}
func Test_endToEnd_traceIDFlow(t *testing.T) {
// This test verifies that trace IDs flow from HTTP request context
// through to the stored QuerySample in goldfish
// Create a mock goldfish storage
storage := &mockGoldfishStorage{}
// Create a simple mock backend server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(200)
_, _ = w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`))
}))
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
// Create backends
backends := []*ProxyBackend{
NewProxyBackend("backend-1", u, time.Second, true), // preferred
NewProxyBackend("backend-2", u, time.Second, false), // non-preferred
}
// Create endpoint with goldfish manager
goldfishConfig := querytee_goldfish.Config{
Enabled: true,
SamplingConfig: querytee_goldfish.SamplingConfig{
DefaultRate: 1.0, // Always sample for testing
},
}
samplesComparator := comparator.NewSamplesComparator(comparator.SampleComparisonOptions{Tolerance: 0.000001})
goldfishManager, err := querytee_goldfish.NewManager(goldfishConfig, samplesComparator, storage, nil, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
endpoint := createTestEndpointWithGoldfish(t, backends, "test", goldfishManager)
// Create request that triggers goldfish sampling
req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query=count_over_time({job=\"test\"}[5m])&start=1700000000&end=1700001000", nil)
req.Header.Set("X-Scope-OrgID", "test-tenant")
// Add trace context to the request (this would normally be done by tracing middleware)
tracer := otel.Tracer("test")
ctx, span := tracer.Start(req.Context(), "test-operation")
defer span.End()
req = req.WithContext(ctx)
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, req)
// Give goldfish async processing time to complete
time.Sleep(200 * time.Millisecond)
// Verify that the system processes the request successfully
assert.Equal(t, 200, w.Code)
// Debug: Check if goldfish was triggered at all
t.Logf("Number of samples stored: %d", len(storage.samples))
t.Logf("Number of results stored: %d", len(storage.results))
// For now, just verify that the system works end-to-end without panicking
// The actual trace ID verification will depend on proper goldfish triggering
if len(storage.samples) > 0 {
sample := storage.samples[0]
assert.Equal(t, "test-tenant", sample.TenantID)
assert.Equal(t, "count_over_time({job=\"test\"}[5m])", sample.Query)
// Verify that the TraceID fields exist and don't cause panics
assert.IsType(t, "", sample.CellATraceID)
assert.IsType(t, "", sample.CellBTraceID)
}
}
// mockGoldfishStorage implements goldfish.Storage for testing
type mockGoldfishStorage struct {
samples []goldfish.QuerySample
results []goldfish.ComparisonResult
}
func (m *mockGoldfishStorage) StoreQuerySample(_ context.Context, sample *goldfish.QuerySample, _ *goldfish.ComparisonResult) error {
m.samples = append(m.samples, *sample)
return nil
}
func (m *mockGoldfishStorage) StoreComparisonResult(_ context.Context, result *goldfish.ComparisonResult) error {
m.results = append(m.results, *result)
return nil
}
func (m *mockGoldfishStorage) GetSampledQueries(_ context.Context, page, pageSize int, _ goldfish.QueryFilter) (*goldfish.APIResponse, error) {
// This is only used for UI, not needed in proxy tests
return &goldfish.APIResponse{
Queries: []goldfish.QuerySample{},
HasMore: false,
Page: page,
PageSize: pageSize,
}, nil
}
func (m *mockGoldfishStorage) GetStatistics(_ context.Context, _ goldfish.StatsFilter) (*goldfish.Statistics, error) {
return nil, nil
}
func (m *mockGoldfishStorage) Close() error {
return nil
}
func (m *mockGoldfishStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*goldfish.QuerySample, error) {
return nil, nil
}
func (m *mockGoldfishStorage) Reset() {
m.samples = []goldfish.QuerySample{}
m.results = []goldfish.ComparisonResult{}
}
// TestProxyEndpoint_QuerySplitting tests the query splitting functionality for goldfish comparison
func TestProxyEndpoint_QuerySplitting(t *testing.T) {
now := time.Now().Truncate(time.Minute)
minAge := 3 * time.Hour
threshold := now.Add(-minAge)
oldResponseBody := `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"test_metric","job":"test"},"values":[[1000,"1.0"],[2000,"2.0"]]}]}}`
recentResponseBody := `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"test_metric","job":"test"},"values":[[3000,"3.0"],[4000,"4.0"]]}]}}`
var mu sync.Mutex
receivedQueries := []string{}
step := "60s"
stepMs := int64(60000)
// Calculate the split boundary: step-aligned threshold + step
splitBoundary := stepAlignUp(threshold, stepMs).Add(time.Duration(stepMs) * time.Millisecond)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
receivedQueries = append(receivedQueries, r.URL.RawQuery)
mu.Unlock()
rangeQuery, err := loghttp.ParseRangeQuery(r)
require.NoError(t, err)
endTime := rangeQuery.End
// If end time is before or at split boundary, return old response
// Otherwise return recent response
w.WriteHeader(200)
if endTime.Before(splitBoundary) || endTime.Equal(splitBoundary) {
_, _ = w.Write([]byte(oldResponseBody))
} else {
_, _ = w.Write([]byte(recentResponseBody))
}
}))
defer server.Close()
u, err := url.Parse(server.URL)
require.NoError(t, err)
backends := []*ProxyBackend{
NewProxyBackend("backend-1", u, time.Second, true), // preferred
NewProxyBackend("backend-2", u, time.Second, false), // non-preferred
}
storage := &mockGoldfishStorage{}
goldfishConfig := querytee_goldfish.Config{
Enabled: true,
SamplingConfig: querytee_goldfish.SamplingConfig{
DefaultRate: 1.0, // Always sample
},
ComparisonMinAge: minAge,
}
goldfishManager, err := querytee_goldfish.NewManager(
goldfishConfig,
&testComparator{},
storage,
nil,
log.NewNopLogger(),
prometheus.NewRegistry(),
)
require.NoError(t, err)
endpoint := createTestEndpointWithGoldfish(t, backends, "test", goldfishManager)
t.Run("query entirely recent (skips goldfish)", func(t *testing.T) {
receivedQueries = []string{}
storage.Reset()
// Query from threshold+1h to threshold+2h (all recent)
start := threshold.Add(time.Hour)
end := threshold.Add(2 * time.Hour)
req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query={job=\"test\"}&start="+formatTime(start)+"&end="+formatTime(end)+"&step="+step, nil)
req.Header.Set("X-Scope-OrgID", "test-tenant")
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, req)
assert.Equal(t, 200, w.Code)
assert.Equal(t, 1, len(receivedQueries), "expect only 1 query, to v1 engine only, got %d", len(receivedQueries))
assert.Equal(t, 0, len(storage.samples), "recent query should not be sent to goldfish cell or compared")
})
t.Run("query entirely old (normal goldfish flow)", func(t *testing.T) {
receivedQueries = []string{}
storage.Reset()
// Query from threshold-2h to threshold-1h (all old)
start := threshold.Add(-2 * time.Hour)
end := threshold.Add(-time.Hour)
req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query={job=\"test\"}&start="+formatTime(start)+"&end="+formatTime(end)+"&step="+step, nil)
req.Header.Set("X-Scope-OrgID", "test-tenant")
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, req)
// Give goldfish time to process
time.Sleep(2 * time.Second)
assert.Equal(t, 200, w.Code)
assert.Equal(t, 2, len(receivedQueries), "expected 1 query each to v1 and v2 for comparison, got %d", len(receivedQueries))
assert.Equal(t, 1, len(storage.samples), "Goldfish should process entirely old queries normally")
})
t.Run("query spans threshold (split and merge)", func(t *testing.T) {
receivedQueries = []string{}
storage.Reset()
// Query from threshold-1h to threshold+1h (spans threshold)
start := threshold.Add(-time.Hour)
end := threshold.Add(time.Hour)
req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query={job=\"test\"}&start="+formatTime(start)+"&end="+formatTime(end)+"&step="+step, nil)
req.Header.Set("X-Scope-OrgID", "test-tenant")
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, req)
// Give goldfish time to process
time.Sleep(500 * time.Millisecond)
assert.Equal(t, 200, w.Code)
assert.Equal(t, 3, len(receivedQueries), "expected 3 queries, 1 for the recent portion, and 1 to both v1 and v2 for comparison, got %d", len(receivedQueries))
// Verify that old queries go to both backends
// Step is 60s = 60000ms from the test query
stepMs := int64(60000)
// Align threshold UP (as it's treated as an end boundary in alignStartEnd)
// This is v2End in the engine_router
alignedThreshold := stepAlignUp(threshold, stepMs)
// Based on observed behavior, old queries end at alignedThreshold + gap
oldQueryBoundary := alignedThreshold.Add(time.Duration(stepMs) * time.Millisecond)
oldQueries := 0
recentQueries := 0
for _, q := range receivedQueries {
if strings.Contains(q, "end=") {
endStr := extractQueryParam(q, "end")
endTime, _ := parseTimestamp(endStr)
// Queries ending at or before oldQueryBoundary are "old"
if endTime.Before(oldQueryBoundary) || endTime.Equal(oldQueryBoundary) {
oldQueries++
} else {
recentQueries++
}
}
}
assert.Equal(t, 2, oldQueries, "Old portion should be sent to both backends")
assert.Equal(t, 1, recentQueries, "Recent portion should only be sent to preferred backend")
assert.Equal(t, 1, len(storage.samples), "goldfish compares the old portion of the query between the two backends")
// Parse the JSON response and verify concatenation
var response loghttp.QueryResponse
err = json.Unmarshal(w.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "success", response.Status)
assert.Equal(t, string(loghttp.ResultTypeMatrix), string(response.Data.ResultType))
// Verify we got a matrix response
matrix, ok := response.Data.Result.(loghttp.Matrix)
require.True(t, ok, "Response should be a matrix")
require.Len(t, matrix, 1, "Should have one metric")
metric := matrix[0]
assert.Equal(t, model.LabelValue("test_metric"), metric.Metric["__name__"])
assert.Equal(t, model.LabelValue("test"), metric.Metric["job"])
// The response should contain data from both splits merged together
// The stats should show splits=2
assert.Equal(t, int64(2), response.Data.Statistics.Summary.Splits, "Should show 2 splits in stats")
})
t.Run("v2 compatible metric query with aggregation (split and merge)", func(t *testing.T) {
receivedQueries = []string{}
storage.Reset()
// Query from threshold-1h to threshold+1h (spans threshold)
// Using a v2 compatible metric query with aggregation
start := threshold.Add(-time.Hour)
end := threshold.Add(time.Hour)
metricQuery := url.QueryEscape(`sum by (job) (count_over_time({foo="bar"}[5m]))`)
req := httptest.NewRequest("GET", "/loki/api/v1/query_range?query="+metricQuery+"&start="+formatTime(start)+"&end="+formatTime(end)+"&step="+step, nil)
req.Header.Set("X-Scope-OrgID", "test-tenant")
w := httptest.NewRecorder()
endpoint.ServeHTTP(w, req)
// Give goldfish time to process
time.Sleep(500 * time.Millisecond)
assert.Equal(t, 200, w.Code)
// For v2 compatible metric queries, we expect:
// - 3 queries total: 1 for recent portion (v1 only), 2 for old portion (v1 and v2 for comparison)
assert.Equal(t, 3, len(receivedQueries), "expected 3 queries for v2 metric query, 1 for recent portion and 2 for old portion comparison, got %d", len(receivedQueries))
// Verify that old queries go to both backends
stepMs := int64(60000)
alignedThreshold := stepAlignUp(threshold, stepMs)
oldQueryBoundary := alignedThreshold.Add(time.Duration(stepMs) * time.Millisecond)
oldQueries := 0
recentQueries := 0
for _, q := range receivedQueries {
if strings.Contains(q, "end=") {
endStr := extractQueryParam(q, "end")
endTime, _ := parseTimestamp(endStr)
if endTime.Before(oldQueryBoundary) || endTime.Equal(oldQueryBoundary) {
oldQueries++
} else {
recentQueries++
}
}
}
assert.Equal(t, 2, oldQueries, "Old portion of v2 metric query should be sent to both backends for comparison")
assert.Equal(t, 1, recentQueries, "Recent portion of v2 metric query should only be sent to preferred backend")
assert.Equal(t, 1, len(storage.samples), "goldfish should compare the old portion of v2 metric query between the two backends")
// Parse the JSON response and verify concatenation
var response loghttp.QueryResponse
err = json.Unmarshal(w.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "success", response.Status)
assert.Equal(t, string(loghttp.ResultTypeMatrix), string(response.Data.ResultType))
// Verify we got a matrix response with aggregated results
matrix, ok := response.Data.Result.(loghttp.Matrix)
require.True(t, ok, "Response should be a matrix for metric query")
require.Len(t, matrix, 1, "Should have one aggregated metric")
metric := matrix[0]
assert.Equal(t, model.LabelValue("test"), metric.Metric["job"], "Aggregation should preserve 'job' label")
// The response should contain data from both splits merged together
assert.Equal(t, int64(2), response.Data.Statistics.Summary.Splits, "Should show 2 splits in stats for v2 metric query")
})
}
func parseTimestamp(value string) (time.Time, error) {
nanos, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return time.Time{}, err
}
if len(value) <= 10 {
return time.Unix(nanos, 0), nil
}
return time.Unix(0, nanos), nil
}
// stepAlignUp aligns a timestamp up to the nearest step boundary
func stepAlignUp(t time.Time, stepMs int64) time.Time {
timestampMs := t.UnixMilli()
if mod := timestampMs % stepMs; mod != 0 {
timestampMs += stepMs - mod
}
return time.Unix(0, timestampMs*1e6)
}
// Helper function to extract query parameters from a query string
func extractQueryParam(queryString, param string) string {
parts := strings.SplitSeq(queryString, "&")
for part := range parts {
if after, ok := strings.CutPrefix(part, param+"="); ok {
return after
}
}
return ""
}
func TestProxyEndpoint_ServeHTTP_ForwardsResponseHeaders(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Add("Content-Type", "application/json; charset=utf-8")
fmt.Fprint(w, `{"status":"success","data":{"resultType":"matrix","result":[]}}`)
}))
defer srv.Close()
srvURL, err := url.Parse(srv.URL)
require.NoError(t, err)
backends := []*ProxyBackend{{
name: "backend-1",
endpoint: srvURL,
client: srv.Client(),
timeout: time.Minute,
preferred: true,
}}
recorder := httptest.NewRecorder()
fakeReq := httptest.NewRequestWithContext(context.Background(), "GET", "/loki/api/v1/query_range?query={job=\"test\"}&start=1&end=2", nil)
fakeReq.Header.Set("X-Scope-OrgID", "test-tenant")
endpoint := createTestEndpoint(t, backends, "test", nil, false)
endpoint.ServeHTTP(recorder, fakeReq)
require.Equal(t, http.StatusOK, recorder.Result().StatusCode, "Status code from backend should be forwarded")
require.Equal(t, "application/json; charset=UTF-8", recorder.Result().Header.Get("Content-Type"), "Response header from backend should be forwarded")
}
func formatTime(t time.Time) string {
return strconv.FormatInt(t.UnixNano(), 10)
}