query-tee: support for multi backend response comparison and disabling query proxy for select backends (#6736)

* add support for comparing responses with multiple secondary backends

* allow disabling read proxy for select secondary backends

* Update tools/querytee/proxy.go

Co-authored-by: Salva Corts <salva.corts@grafana.com>

Co-authored-by: Salva Corts <salva.corts@grafana.com>
pull/6796/head
Sandeep Sukhani 3 years ago committed by GitHub
parent e67dd6c91d
commit 2e95a64cb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/querytee/main.go
  2. 69
      tools/querytee/proxy.go
  3. 52
      tools/querytee/proxy_endpoint.go
  4. 4
      tools/querytee/proxy_metrics.go
  5. 55
      tools/querytee/proxy_test.go

@ -44,7 +44,7 @@ func main() {
}
// Run the proxy.
proxy, err := querytee.NewProxy(cfg.ProxyConfig, util_log.Logger, lokiReadRoutes(cfg), registry)
proxy, err := querytee.NewProxy(cfg.ProxyConfig, util_log.Logger, lokiReadRoutes(cfg), lokiWriteRoutes(), registry)
if err != nil {
level.Error(util_log.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error())
os.Exit(1)
@ -77,6 +77,12 @@ func lokiReadRoutes(cfg Config) []querytee.Route {
{Path: "/api/prom/label", RouteName: "api_prom_label", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/api/prom/series", RouteName: "api_prom_series", Methods: []string{"GET"}, ResponseComparator: nil},
}
}
func lokiWriteRoutes() []querytee.Route {
return []querytee.Route{
{Path: "/loki/api/v1/push", RouteName: "api_v1_push", Methods: []string{"POST"}, ResponseComparator: nil},
{Path: "/api/prom/push", RouteName: "api_prom_push", Methods: []string{"POST"}, ResponseComparator: nil},
}
}

@ -28,6 +28,7 @@ type ProxyConfig struct {
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
DisableBackendReadProxy string
ValueComparisonTolerance float64
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
@ -40,6 +41,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.StringVar(&cfg.DisableBackendReadProxy, "proxy.disable-backend-read", "", "Comma separated list of non-primary backend hostnames to disable their read proxy. Typically used for temporarily not passing any read requests to specified backends.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 60*time.Second, "The window from now to skip comparing samples. 0 to disable.")
@ -54,11 +56,12 @@ type Route struct {
}
type Proxy struct {
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
metrics *ProxyMetrics
routes []Route
cfg ProxyConfig
backends []*ProxyBackend
logger log.Logger
metrics *ProxyMetrics
readRoutes []Route
writeRoutes []Route
// The HTTP server used to run the proxy service.
srv *http.Server
@ -68,7 +71,7 @@ type Proxy struct {
done sync.WaitGroup
}
func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) {
func NewProxy(cfg ProxyConfig, logger log.Logger, readRoutes, writeRoutes []Route, registerer prometheus.Registerer) (*Proxy, error) {
if cfg.CompareResponses && cfg.PreferredBackend == "" {
return nil, fmt.Errorf("when enabling comparison of results -backend.preferred flag must be set to hostname of preferred backend")
}
@ -78,10 +81,11 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}
p := &Proxy{
cfg: cfg,
logger: logger,
metrics: NewProxyMetrics(registerer),
routes: routes,
cfg: cfg,
logger: logger,
metrics: NewProxyMetrics(registerer),
readRoutes: readRoutes,
writeRoutes: writeRoutes,
}
// Parse the backend endpoints (comma separated).
@ -133,8 +137,8 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
}
}
if cfg.CompareResponses && len(p.backends) != 2 {
return nil, fmt.Errorf("when enabling comparison of results number of backends should be 2 exactly")
if cfg.CompareResponses && len(p.backends) < 2 {
return nil, fmt.Errorf("when enabling comparison of results number of backends should be at least 2")
}
// At least 2 backends are suggested
@ -142,6 +146,15 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro
level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfil the purpose of the proxy and compare results.")
}
if cfg.DisableBackendReadProxy != "" {
readDisabledBackendHosts := strings.Split(p.cfg.DisableBackendReadProxy, ",")
for _, host := range readDisabledBackendHosts {
if host == cfg.PreferredBackend {
return nil, fmt.Errorf("the preferred backend cannot be disabled for reading")
}
}
}
return p, nil
}
@ -159,13 +172,17 @@ func (p *Proxy) Start() error {
w.WriteHeader(http.StatusOK)
}))
// register routes
for _, route := range p.routes {
// register read routes
for _, route := range p.readRoutes {
var comparator ResponsesComparator
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(filterReadDisabledBackends(p.backends, p.cfg.DisableBackendReadProxy), route.RouteName, p.metrics, p.logger, comparator))
}
for _, route := range p.writeRoutes {
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, nil))
}
if p.cfg.PassThroughNonRegisteredRoutes {
@ -218,3 +235,25 @@ func (p *Proxy) Endpoint() string {
return p.srvListener.Addr().String()
}
func filterReadDisabledBackends(backends []*ProxyBackend, disableReadProxyCfg string) []*ProxyBackend {
readEnabledBackends := make([]*ProxyBackend, 0, len(backends))
readDisabledBackendNames := strings.Split(disableReadProxyCfg, ",")
for _, b := range backends {
if !b.preferred {
readDisabled := false
for _, h := range readDisabledBackendNames {
if strings.TrimSpace(h) == b.name {
readDisabled = true
break
}
}
if readDisabled {
continue
}
}
readEnabledBackends = append(readEnabledBackends, b)
}
return readEnabledBackends
}

@ -74,12 +74,12 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *backendResponse) {
var (
wg = sync.WaitGroup{}
err error
body []byte
responses = make([]*backendResponse, 0, len(p.backends))
responsesMtx = sync.Mutex{}
query = r.URL.RawQuery
wg = sync.WaitGroup{}
err error
body []byte
expectedResponseIdx int
responses = make([]*backendResponse, len(p.backends))
query = r.URL.RawQuery
)
if r.Body != nil {
@ -102,7 +102,8 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", query)
wg.Add(len(p.backends))
for _, b := range p.backends {
for i, b := range p.backends {
i := i
b := b
go func() {
@ -136,9 +137,10 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
// Keep track of the response if required.
if p.comparator != nil {
responsesMtx.Lock()
responses = append(responses, res)
responsesMtx.Unlock()
if b.preferred {
expectedResponseIdx = i
}
responses[i] = res
}
resCh <- res
@ -151,21 +153,25 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *back
// Compare responses.
if p.comparator != nil {
expectedResponse := responses[0]
actualResponse := responses[1]
if responses[1].backend.preferred {
expectedResponse, actualResponse = actualResponse, expectedResponse
}
expectedResponse := responses[expectedResponseIdx]
for i := range responses {
if i == expectedResponseIdx {
continue
}
actualResponse := responses[i]
result := comparisonSuccess
err := p.compareResponses(expectedResponse, actualResponse)
if err != nil {
level.Error(util_log.Logger).Log("msg", "response comparison failed",
"backend-name", p.backends[i].name,
"route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = comparisonFailed
}
result := comparisonSuccess
err := p.compareResponses(expectedResponse, actualResponse)
if err != nil {
level.Error(util_log.Logger).Log("msg", "response comparison failed", "route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = comparisonFailed
p.metrics.responsesComparedTotal.WithLabelValues(p.backends[i].name, p.routeName, result).Inc()
}
p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc()
}
}

@ -32,8 +32,8 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_querytee",
Name: "responses_compared_total",
Help: "Total number of responses compared per route name by result.",
}, []string{"route", "result"}),
Help: "Total number of responses compared per route and backend name by result.",
}, []string{"backend", "route", "result"}),
}
return m

@ -5,6 +5,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"strings"
"testing"
@ -16,10 +17,12 @@ import (
"github.com/stretchr/testify/require"
)
var testRoutes = []Route{
var testReadRoutes = []Route{
{Path: "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: &testComparator{}},
}
var testWriteRoutes = []Route{}
type testComparator struct{}
func (testComparator) Compare(expected, actual []byte) error { return nil }
@ -27,7 +30,7 @@ func (testComparator) Compare(expected, actual []byte) error { return nil }
func Test_NewProxy(t *testing.T) {
cfg := ProxyConfig{}
p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil)
p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil)
assert.Equal(t, errMinBackends, err)
assert.Nil(t, p)
}
@ -164,7 +167,7 @@ func Test_Proxy_RequestsForwarding(t *testing.T) {
cfg.CompareResponses = true
}
p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil)
p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil)
require.NoError(t, err)
require.NotNil(t, p)
defer p.Stop() //nolint:errcheck
@ -313,7 +316,7 @@ func TestProxy_Passthrough(t *testing.T) {
PassThroughNonRegisteredRoutes: true,
}
p, err := NewProxy(cfg, log.NewNopLogger(), testRoutes, nil)
p, err := NewProxy(cfg, log.NewNopLogger(), testReadRoutes, testWriteRoutes, nil)
require.NoError(t, err)
require.NotNil(t, p)
defer p.Stop() //nolint:errcheck
@ -352,3 +355,47 @@ func mockQueryResponse(path string, status int, res string) http.HandlerFunc {
}
}
}
func TestFilterReadDisabledBackend(t *testing.T) {
urlMustParse := func(urlStr string) *url.URL {
u, err := url.Parse(urlStr)
require.NoError(t, err)
return u
}
backends := []*ProxyBackend{
NewProxyBackend("test1", urlMustParse("http:/test1"), time.Second, true),
NewProxyBackend("test2", urlMustParse("http:/test2"), time.Second, false),
NewProxyBackend("test3", urlMustParse("http:/test3"), time.Second, false),
NewProxyBackend("test4", urlMustParse("http:/test4"), time.Second, false),
}
for name, tc := range map[string]struct {
disableReadProxyCfg string
expectedBackends []*ProxyBackend
}{
"nothing disabled": {
expectedBackends: backends,
},
"test2 disabled": {
disableReadProxyCfg: "test2",
expectedBackends: []*ProxyBackend{backends[0], backends[2], backends[3]},
},
"test2 and test4 disabled": {
disableReadProxyCfg: "test2, test4",
expectedBackends: []*ProxyBackend{backends[0], backends[2]},
},
"all secondary disabled": {
disableReadProxyCfg: "test2, test4,test3",
expectedBackends: []*ProxyBackend{backends[0]},
},
"disabling primary should not be filtered out": {
disableReadProxyCfg: "test1",
expectedBackends: backends,
},
} {
t.Run(name, func(t *testing.T) {
filteredBackends := filterReadDisabledBackends(backends, tc.disableReadProxyCfg)
require.Equal(t, tc.expectedBackends, filteredBackends)
})
}
}

Loading…
Cancel
Save