From 2e95a64cb45e1f2cffcaaca0dea6ee8f7b7741dd Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 28 Jul 2022 17:59:42 +0530 Subject: [PATCH] query-tee: support for multi backend response comparison and disabling query proxy for select backends (#6736) * add support for comparing responses with multiple secondary backends * allow disabling read proxy for select secondary backends * Update tools/querytee/proxy.go Co-authored-by: Salva Corts Co-authored-by: Salva Corts --- cmd/querytee/main.go | 8 +++- tools/querytee/proxy.go | 69 +++++++++++++++++++++++++------- tools/querytee/proxy_endpoint.go | 52 +++++++++++++----------- tools/querytee/proxy_metrics.go | 4 +- tools/querytee/proxy_test.go | 55 +++++++++++++++++++++++-- 5 files changed, 143 insertions(+), 45 deletions(-) diff --git a/cmd/querytee/main.go b/cmd/querytee/main.go index 9295bdfa2f..2b9df37b3e 100644 --- a/cmd/querytee/main.go +++ b/cmd/querytee/main.go @@ -44,7 +44,7 @@ func main() { } // Run the proxy. - proxy, err := querytee.NewProxy(cfg.ProxyConfig, util_log.Logger, lokiReadRoutes(cfg), registry) + proxy, err := querytee.NewProxy(cfg.ProxyConfig, util_log.Logger, lokiReadRoutes(cfg), lokiWriteRoutes(), registry) if err != nil { level.Error(util_log.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error()) os.Exit(1) @@ -77,6 +77,12 @@ func lokiReadRoutes(cfg Config) []querytee.Route { {Path: "/api/prom/label", RouteName: "api_prom_label", Methods: []string{"GET"}, ResponseComparator: nil}, {Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil}, {Path: "/api/prom/series", RouteName: "api_prom_series", Methods: []string{"GET"}, ResponseComparator: nil}, + } +} + +func lokiWriteRoutes() []querytee.Route { + return []querytee.Route{ {Path: "/loki/api/v1/push", RouteName: "api_v1_push", Methods: []string{"POST"}, ResponseComparator: nil}, + {Path: "/api/prom/push", RouteName: "api_prom_push", Methods: []string{"POST"}, ResponseComparator: nil}, } } diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index 0e92be22a7..4032c3b05b 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -28,6 +28,7 @@ type ProxyConfig struct { PreferredBackend string BackendReadTimeout time.Duration CompareResponses bool + DisableBackendReadProxy string ValueComparisonTolerance float64 UseRelativeError bool PassThroughNonRegisteredRoutes bool @@ -40,6 +41,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.") f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.") f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.") + f.StringVar(&cfg.DisableBackendReadProxy, "proxy.disable-backend-read", "", "Comma separated list of non-primary backend hostnames to disable their read proxy. Typically used for temporarily not passing any read requests to specified backends.") f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).") f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.") f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.") @@ -54,11 +56,12 @@ type Route struct { } type Proxy struct { - cfg ProxyConfig - backends []*ProxyBackend - logger log.Logger - metrics *ProxyMetrics - routes []Route + cfg ProxyConfig + backends []*ProxyBackend + logger log.Logger + metrics *ProxyMetrics + readRoutes []Route + writeRoutes []Route // The HTTP server used to run the proxy service. srv *http.Server @@ -68,7 +71,7 @@ type Proxy struct { done sync.WaitGroup } -func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) { +func NewProxy(cfg ProxyConfig, logger log.Logger, readRoutes, writeRoutes []Route, registerer prometheus.Registerer) (*Proxy, error) { if cfg.CompareResponses && cfg.PreferredBackend == "" { return nil, fmt.Errorf("when enabling comparison of results -backend.preferred flag must be set to hostname of preferred backend") } @@ -78,10 +81,11 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro } p := &Proxy{ - cfg: cfg, - logger: logger, - metrics: NewProxyMetrics(registerer), - routes: routes, + cfg: cfg, + logger: logger, + metrics: NewProxyMetrics(registerer), + readRoutes: readRoutes, + writeRoutes: writeRoutes, } // Parse the backend endpoints (comma separated). @@ -133,8 +137,8 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro } } - if cfg.CompareResponses && len(p.backends) != 2 { - return nil, fmt.Errorf("when enabling comparison of results number of backends should be 2 exactly") + if cfg.CompareResponses && len(p.backends) < 2 { + return nil, fmt.Errorf("when enabling comparison of results number of backends should be at least 2") } // At least 2 backends are suggested @@ -142,6 +146,15 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfil the purpose of the proxy and compare results.") } + if cfg.DisableBackendReadProxy != "" { + readDisabledBackendHosts := strings.Split(p.cfg.DisableBackendReadProxy, ",") + for _, host := range readDisabledBackendHosts { + if host == cfg.PreferredBackend { + return nil, fmt.Errorf("the preferred backend cannot be disabled for reading") + } + } + } + return p, nil } @@ -159,13 +172,17 @@ func (p *Proxy) Start() error { w.WriteHeader(http.StatusOK) })) - // register routes - for _, route := range p.routes { + // register read routes + for _, route := range p.readRoutes { var comparator ResponsesComparator if p.cfg.CompareResponses { comparator = route.ResponseComparator } - router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator)) + router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(filterReadDisabledBackends(p.backends, p.cfg.DisableBackendReadProxy), route.RouteName, p.metrics, p.logger, comparator)) + } + + for _, route := range p.writeRoutes { + router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, nil)) } if p.cfg.PassThroughNonRegisteredRoutes { @@ -218,3 +235,25 @@ func (p *Proxy) Endpoint() string { return p.srvListener.Addr().String() } + +func filterReadDisabledBackends(backends []*ProxyBackend, disableReadProxyCfg string) []*ProxyBackend { + readEnabledBackends := make([]*ProxyBackend, 0, len(backends)) + readDisabledBackendNames := strings.Split(disableReadProxyCfg, ",") + for _, b := range backends { + if !b.preferred { + readDisabled := false + for _, h := range readDisabledBackendNames { + if strings.TrimSpace(h) == b.name { + readDisabled = true + break + } + } + if readDisabled { + continue + } + } + readEnabledBackends = append(readEnabledBackends, b) + } + + return readEnabledBackends +} diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 9d3bf405d5..8dcdca81a2 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -74,12 +74,12 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *backendResponse) { var ( - wg = sync.WaitGroup{} - err error - body []byte - responses = make([]*backendResponse, 0, len(p.backends)) - responsesMtx = sync.Mutex{} - query = r.URL.RawQuery + wg = sync.WaitGroup{} + err error + body []byte + expectedResponseIdx int + responses = make([]*backendResponse, len(p.backends)) + query = r.URL.RawQuery ) if r.Body != nil { @@ -102,7 +102,8 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", query) wg.Add(len(p.backends)) - for _, b := range p.backends { + for i, b := range p.backends { + i := i b := b go func() { @@ -136,9 +137,10 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back // Keep track of the response if required. if p.comparator != nil { - responsesMtx.Lock() - responses = append(responses, res) - responsesMtx.Unlock() + if b.preferred { + expectedResponseIdx = i + } + responses[i] = res } resCh <- res @@ -151,21 +153,25 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back // Compare responses. if p.comparator != nil { - expectedResponse := responses[0] - actualResponse := responses[1] - if responses[1].backend.preferred { - expectedResponse, actualResponse = actualResponse, expectedResponse - } + expectedResponse := responses[expectedResponseIdx] + for i := range responses { + if i == expectedResponseIdx { + continue + } + actualResponse := responses[i] + + result := comparisonSuccess + err := p.compareResponses(expectedResponse, actualResponse) + if err != nil { + level.Error(util_log.Logger).Log("msg", "response comparison failed", + "backend-name", p.backends[i].name, + "route-name", p.routeName, + "query", r.URL.RawQuery, "err", err) + result = comparisonFailed + } - result := comparisonSuccess - err := p.compareResponses(expectedResponse, actualResponse) - if err != nil { - level.Error(util_log.Logger).Log("msg", "response comparison failed", "route-name", p.routeName, - "query", r.URL.RawQuery, "err", err) - result = comparisonFailed + p.metrics.responsesComparedTotal.WithLabelValues(p.backends[i].name, p.routeName, result).Inc() } - - p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc() } } diff --git a/tools/querytee/proxy_metrics.go b/tools/querytee/proxy_metrics.go index ea126d12a6..23a95e8fab 100644 --- a/tools/querytee/proxy_metrics.go +++ b/tools/querytee/proxy_metrics.go @@ -32,8 +32,8 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_querytee", Name: "responses_compared_total", - Help: "Total number of responses compared per route name by result.", - }, []string{"route", "result"}), + Help: "Total number of responses compared per route and backend name by result.", + }, []string{"backend", "route", "result"}), } return m diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index d7575c7059..e98f24274a 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "net/url" "strconv" "strings" "testing" @@ -16,10 +17,12 @@ import ( "github.com/stretchr/testify/require" ) -var testRoutes = []Route{ +var testReadRoutes = []Route{ {Path: "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: &testComparator{}}, } +var testWriteRoutes = []Route{} + type testComparator struct{} func (testComparator) Compare(expected, actual []byte) error { return nil } @@ -27,7 +30,7 @@ func (testComparator) Compare(expected, actual []byte) error { return nil } func Test_NewProxy(t *testing.T) { cfg := ProxyConfig{} - p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil) + p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil) assert.Equal(t, errMinBackends, err) assert.Nil(t, p) } @@ -164,7 +167,7 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { cfg.CompareResponses = true } - p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil) + p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil) require.NoError(t, err) require.NotNil(t, p) defer p.Stop() //nolint:errcheck @@ -313,7 +316,7 @@ func TestProxy_Passthrough(t *testing.T) { PassThroughNonRegisteredRoutes: true, } - p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil) + p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil) require.NoError(t, err) require.NotNil(t, p) defer p.Stop() //nolint:errcheck @@ -352,3 +355,47 @@ func mockQueryResponse(path string, status int, res string) http.HandlerFunc { } } } + +func TestFilterReadDisabledBackend(t *testing.T) { + urlMustParse := func(urlStr string) *url.URL { + u, err := url.Parse(urlStr) + require.NoError(t, err) + return u + } + + backends := []*ProxyBackend{ + NewProxyBackend("test1", urlMustParse("http:/test1"), time.Second, true), + NewProxyBackend("test2", urlMustParse("http:/test2"), time.Second, false), + NewProxyBackend("test3", urlMustParse("http:/test3"), time.Second, false), + NewProxyBackend("test4", urlMustParse("http:/test4"), time.Second, false), + } + for name, tc := range map[string]struct { + disableReadProxyCfg string + expectedBackends []*ProxyBackend + }{ + "nothing disabled": { + expectedBackends: backends, + }, + "test2 disabled": { + disableReadProxyCfg: "test2", + expectedBackends: []*ProxyBackend{backends[0], backends[2], backends[3]}, + }, + "test2 and test4 disabled": { + disableReadProxyCfg: "test2, test4", + expectedBackends: []*ProxyBackend{backends[0], backends[2]}, + }, + "all secondary disabled": { + disableReadProxyCfg: "test2, test4,test3", + expectedBackends: []*ProxyBackend{backends[0]}, + }, + "disabling primary should not be filtered out": { + disableReadProxyCfg: "test1", + expectedBackends: backends, + }, + } { + t.Run(name, func(t *testing.T) { + filteredBackends := filterReadDisabledBackends(backends, tc.disableReadProxyCfg) + require.Equal(t, tc.expectedBackends, filteredBackends) + }) + } +}