mirror of https://github.com/grafana/loki
query tee proxy with support for comparison of responses (#2211)
parent
1127d9328b
commit
f04bc99e05
@ -0,0 +1,77 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
"os" |
||||
|
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/weaveworks/common/logging" |
||||
"github.com/weaveworks/common/server" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
"github.com/cortexproject/cortex/tools/querytee" |
||||
|
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
) |
||||
|
||||
type Config struct { |
||||
ServerMetricsPort int |
||||
LogLevel logging.Level |
||||
ProxyConfig querytee.ProxyConfig |
||||
} |
||||
|
||||
func main() { |
||||
// Parse CLI flags.
|
||||
cfg := Config{} |
||||
flag.IntVar(&cfg.ServerMetricsPort, "server.metrics-port", 9900, "The port where metrics are exposed.") |
||||
cfg.LogLevel.RegisterFlags(flag.CommandLine) |
||||
cfg.ProxyConfig.RegisterFlags(flag.CommandLine) |
||||
flag.Parse() |
||||
|
||||
util.InitLogger(&server.Config{ |
||||
LogLevel: cfg.LogLevel, |
||||
}) |
||||
|
||||
// Run the instrumentation server.
|
||||
registry := prometheus.NewRegistry() |
||||
registry.MustRegister(prometheus.NewGoCollector()) |
||||
|
||||
i := querytee.NewInstrumentationServer(cfg.ServerMetricsPort, registry) |
||||
if err := i.Start(); err != nil { |
||||
level.Error(util.Logger).Log("msg", "Unable to start instrumentation server", "err", err.Error()) |
||||
os.Exit(1) |
||||
} |
||||
|
||||
// Run the proxy.
|
||||
proxy, err := querytee.NewProxy(cfg.ProxyConfig, util.Logger, lokiReadRoutes(), registry) |
||||
if err != nil { |
||||
level.Error(util.Logger).Log("msg", "Unable to initialize the proxy", "err", err.Error()) |
||||
os.Exit(1) |
||||
} |
||||
|
||||
if err := proxy.Start(); err != nil { |
||||
level.Error(util.Logger).Log("msg", "Unable to start the proxy", "err", err.Error()) |
||||
os.Exit(1) |
||||
} |
||||
|
||||
proxy.Await() |
||||
} |
||||
|
||||
func lokiReadRoutes() []querytee.Route { |
||||
samplesComparator := querytee.NewSamplesComparator() |
||||
samplesComparator.RegisterSamplesType(loghttp.ResultTypeStream, compareStreams) |
||||
|
||||
return []querytee.Route{ |
||||
{Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: "GET", ResponseComparator: samplesComparator}, |
||||
{Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: "GET", ResponseComparator: samplesComparator}, |
||||
{Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/api/prom/query", RouteName: "api_prom_query", Methods: "GET", ResponseComparator: samplesComparator}, |
||||
{Path: "/api/prom/label", RouteName: "api_prom_label", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: "GET", ResponseComparator: nil}, |
||||
{Path: "/api/prom/series", RouteName: "api_prom_series", Methods: "GET", ResponseComparator: nil}, |
||||
} |
||||
} |
||||
@ -0,0 +1,69 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
"github.com/go-kit/kit/log/level" |
||||
|
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
) |
||||
|
||||
func compareStreams(expectedRaw, actualRaw json.RawMessage) error { |
||||
var expected, actual loghttp.Streams |
||||
|
||||
err := json.Unmarshal(expectedRaw, &expected) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = json.Unmarshal(actualRaw, &actual) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if len(expected) != len(actual) { |
||||
return fmt.Errorf("expected %d streams but got %d", len(expected), len(actual)) |
||||
} |
||||
|
||||
streamLabelsToIndexMap := make(map[string]int, len(expected)) |
||||
for i, actualStream := range actual { |
||||
streamLabelsToIndexMap[actualStream.Labels.String()] = i |
||||
} |
||||
|
||||
for _, expectedStream := range expected { |
||||
actualStreamIndex, ok := streamLabelsToIndexMap[expectedStream.Labels.String()] |
||||
if !ok { |
||||
return fmt.Errorf("expected stream %s missing from actual response", expectedStream.Labels) |
||||
} |
||||
|
||||
actualStream := actual[actualStreamIndex] |
||||
expectedValuesLen := len(expectedStream.Entries) |
||||
actualValuesLen := len(actualStream.Entries) |
||||
|
||||
if expectedValuesLen != actualValuesLen { |
||||
err := fmt.Errorf("expected %d values for stream %s but got %d", expectedValuesLen, |
||||
expectedStream.Labels, actualValuesLen) |
||||
if expectedValuesLen > 0 && actualValuesLen > 0 { |
||||
level.Error(util.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedStream.Entries[0].Timestamp.UnixNano(), |
||||
"newest-expected-ts", expectedStream.Entries[expectedValuesLen-1].Timestamp.UnixNano(), |
||||
"oldest-actual-ts", actualStream.Entries[0].Timestamp.UnixNano(), "newest-actual-ts", actualStream.Entries[actualValuesLen-1].Timestamp.UnixNano()) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
for i, expectedSamplePair := range expectedStream.Entries { |
||||
actualSamplePair := actualStream.Entries[i] |
||||
if !expectedSamplePair.Timestamp.Equal(actualSamplePair.Timestamp) { |
||||
return fmt.Errorf("expected timestamp %v but got %v for stream %s", expectedSamplePair.Timestamp.UnixNano(), |
||||
actualSamplePair.Timestamp.UnixNano(), expectedStream.Labels) |
||||
} |
||||
if expectedSamplePair.Line != actualSamplePair.Line { |
||||
return fmt.Errorf("expected line %s for timestamp %v but got %s for stream %s", expectedSamplePair.Line, |
||||
expectedSamplePair.Timestamp.UnixNano(), actualSamplePair.Line, expectedStream.Labels) |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
@ -0,0 +1,102 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"errors" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestCompareStreams(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
name string |
||||
expected json.RawMessage |
||||
actual json.RawMessage |
||||
err error |
||||
}{ |
||||
{ |
||||
name: "no streams", |
||||
expected: json.RawMessage(`[]`), |
||||
actual: json.RawMessage(`[]`), |
||||
}, |
||||
{ |
||||
name: "no streams in actual response", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[]`), |
||||
err: errors.New("expected 1 streams but got 0"), |
||||
}, |
||||
{ |
||||
name: "extra stream in actual response", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"]]}, |
||||
{"stream":{"foo1":"bar1"},"values":[["1","1"]]} |
||||
]`), |
||||
err: errors.New("expected 1 streams but got 2"), |
||||
}, |
||||
{ |
||||
name: "same number of streams but with different labels", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo1":"bar1"},"values":[["1","1"]]} |
||||
]`), |
||||
err: errors.New("expected stream {foo=\"bar\"} missing from actual response"), |
||||
}, |
||||
{ |
||||
name: "difference in number of samples", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"]]} |
||||
]`), |
||||
err: errors.New("expected 2 values for stream {foo=\"bar\"} but got 1"), |
||||
}, |
||||
{ |
||||
name: "difference in sample timestamp", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["3","2"]]} |
||||
]`), |
||||
err: errors.New("expected timestamp 2 but got 3 for stream {foo=\"bar\"}"), |
||||
}, |
||||
{ |
||||
name: "difference in sample value", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","3"]]} |
||||
]`), |
||||
err: errors.New("expected line 2 for timestamp 2 but got 3 for stream {foo=\"bar\"}"), |
||||
}, |
||||
{ |
||||
name: "correct samples", |
||||
expected: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} |
||||
]`), |
||||
actual: json.RawMessage(`[ |
||||
{"stream":{"foo":"bar"},"values":[["1","1"],["2","2"]]} |
||||
]`), |
||||
}, |
||||
} { |
||||
t.Run(tc.name, func(t *testing.T) { |
||||
err := compareStreams(tc.expected, tc.actual) |
||||
if tc.err == nil { |
||||
require.NoError(t, err) |
||||
return |
||||
} |
||||
require.Error(t, err) |
||||
require.Equal(t, tc.err.Error(), err.Error()) |
||||
}) |
||||
} |
||||
} |
||||
@ -0,0 +1,13 @@ |
||||
Copyright 2018 Weaveworks. All rights reserved. |
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); |
||||
you may not use this file except in compliance with the License. |
||||
You may obtain a copy of the License at |
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0 |
||||
|
||||
Unless required by applicable law or agreed to in writing, software |
||||
distributed under the License is distributed on an "AS IS" BASIS, |
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
See the License for the specific language governing permissions and |
||||
limitations under the License. |
||||
@ -0,0 +1,60 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"net/http" |
||||
|
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/gorilla/mux" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promhttp" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
) |
||||
|
||||
type InstrumentationServer struct { |
||||
port int |
||||
registry *prometheus.Registry |
||||
srv *http.Server |
||||
} |
||||
|
||||
// NewInstrumentationServer returns a server exposing Prometheus metrics.
|
||||
func NewInstrumentationServer(port int, registry *prometheus.Registry) *InstrumentationServer { |
||||
return &InstrumentationServer{ |
||||
port: port, |
||||
registry: registry, |
||||
} |
||||
} |
||||
|
||||
// Start the instrumentation server.
|
||||
func (s *InstrumentationServer) Start() error { |
||||
// Setup listener first, so we can fail early if the port is in use.
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
router := mux.NewRouter() |
||||
router.Handle("/metrics", promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{})) |
||||
|
||||
s.srv = &http.Server{ |
||||
Handler: router, |
||||
} |
||||
|
||||
go func() { |
||||
if err := s.srv.Serve(listener); err != nil { |
||||
level.Error(util.Logger).Log("msg", "metrics server terminated", "err", err) |
||||
} |
||||
}() |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Stop closes the instrumentation server.
|
||||
func (s *InstrumentationServer) Stop() { |
||||
if s.srv != nil { |
||||
s.srv.Close() |
||||
s.srv = nil |
||||
} |
||||
} |
||||
@ -0,0 +1,184 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
"net" |
||||
"net/http" |
||||
"net/url" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/kit/log" |
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/gorilla/mux" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
var ( |
||||
errMinBackends = errors.New("at least 1 backend is required") |
||||
) |
||||
|
||||
type ProxyConfig struct { |
||||
ServerServicePort int |
||||
BackendEndpoints string |
||||
PreferredBackend string |
||||
BackendReadTimeout time.Duration |
||||
CompareResponses bool |
||||
} |
||||
|
||||
func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { |
||||
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.") |
||||
f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.") |
||||
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client.") |
||||
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.") |
||||
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.") |
||||
} |
||||
|
||||
type Route struct { |
||||
Path string |
||||
RouteName string |
||||
Methods string |
||||
ResponseComparator ResponsesComparator |
||||
} |
||||
|
||||
type Proxy struct { |
||||
cfg ProxyConfig |
||||
backends []*ProxyBackend |
||||
logger log.Logger |
||||
metrics *ProxyMetrics |
||||
routes []Route |
||||
|
||||
// The HTTP server used to run the proxy service.
|
||||
srv *http.Server |
||||
srvListener net.Listener |
||||
|
||||
// Wait group used to wait until the server has done.
|
||||
done sync.WaitGroup |
||||
} |
||||
|
||||
func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) { |
||||
if cfg.CompareResponses && cfg.PreferredBackend == "" { |
||||
return nil, fmt.Errorf("when enabling comparion of results -backend.preferred flag must be set to hostname of preferred backend") |
||||
} |
||||
|
||||
p := &Proxy{ |
||||
cfg: cfg, |
||||
logger: logger, |
||||
metrics: NewProxyMetrics(registerer), |
||||
routes: routes, |
||||
} |
||||
|
||||
// Parse the backend endpoints (comma separated).
|
||||
parts := strings.Split(cfg.BackendEndpoints, ",") |
||||
|
||||
for idx, part := range parts { |
||||
// Skip empty ones.
|
||||
part = strings.TrimSpace(part) |
||||
if part == "" { |
||||
continue |
||||
} |
||||
|
||||
u, err := url.Parse(part) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, "invalid backend endpoint %s", part) |
||||
} |
||||
|
||||
// The backend name is hardcoded as the backend hostname.
|
||||
name := u.Hostname() |
||||
preferred := name == cfg.PreferredBackend |
||||
|
||||
// In tests we have the same hostname for all backends, so we also
|
||||
// support a numeric preferred backend which is the index in the list
|
||||
// of backends.
|
||||
if preferredIdx, err := strconv.Atoi(cfg.PreferredBackend); err == nil { |
||||
preferred = preferredIdx == idx |
||||
} |
||||
|
||||
p.backends = append(p.backends, NewProxyBackend(name, u, cfg.BackendReadTimeout, preferred)) |
||||
} |
||||
|
||||
// At least 1 backend is required
|
||||
if len(p.backends) < 1 { |
||||
return nil, errMinBackends |
||||
} |
||||
|
||||
if cfg.CompareResponses && len(p.backends) != 2 { |
||||
return nil, fmt.Errorf("when enabling comparison of results number of backends should be 2 exactly") |
||||
} |
||||
|
||||
// At least 2 backends are suggested
|
||||
if len(p.backends) < 2 { |
||||
level.Warn(p.logger).Log("msg", "The proxy is running with only 1 backend. At least 2 backends are required to fulfil the purpose of the proxy and compare results.") |
||||
} |
||||
|
||||
return p, nil |
||||
} |
||||
|
||||
func (p *Proxy) Start() error { |
||||
// Setup listener first, so we can fail early if the port is in use.
|
||||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", p.cfg.ServerServicePort)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
router := mux.NewRouter() |
||||
|
||||
// Health check endpoint.
|
||||
router.Path("/").Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { |
||||
w.WriteHeader(http.StatusOK) |
||||
})) |
||||
|
||||
// register routes
|
||||
var comparator ResponsesComparator |
||||
for _, route := range p.routes { |
||||
if p.cfg.CompareResponses { |
||||
comparator = route.ResponseComparator |
||||
} |
||||
router.Path(route.Path).Methods(route.Methods).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator)) |
||||
} |
||||
|
||||
p.srvListener = listener |
||||
p.srv = &http.Server{ |
||||
ReadTimeout: 1 * time.Minute, |
||||
WriteTimeout: 2 * time.Minute, |
||||
Handler: router, |
||||
} |
||||
|
||||
// Run in a dedicated goroutine.
|
||||
p.done.Add(1) |
||||
go func() { |
||||
defer p.done.Done() |
||||
|
||||
if err := p.srv.Serve(p.srvListener); err != nil { |
||||
level.Error(p.logger).Log("msg", "Proxy server failed", "err", err) |
||||
} |
||||
}() |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (p *Proxy) Stop() error { |
||||
if p.srv == nil { |
||||
return nil |
||||
} |
||||
|
||||
return p.srv.Shutdown(context.Background()) |
||||
} |
||||
|
||||
func (p *Proxy) Await() { |
||||
// Wait until terminated.
|
||||
p.done.Wait() |
||||
} |
||||
|
||||
func (p *Proxy) Endpoint() string { |
||||
if p.srvListener == nil { |
||||
return "" |
||||
} |
||||
|
||||
return p.srvListener.Addr().String() |
||||
} |
||||
@ -0,0 +1,112 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"context" |
||||
"io/ioutil" |
||||
"net" |
||||
"net/http" |
||||
"net/url" |
||||
"path" |
||||
"time" |
||||
|
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
// ProxyBackend holds the information of a single backend.
|
||||
type ProxyBackend struct { |
||||
name string |
||||
endpoint *url.URL |
||||
client *http.Client |
||||
timeout time.Duration |
||||
|
||||
// Whether this is the preferred backend from which picking up
|
||||
// the response and sending it back to the client.
|
||||
preferred bool |
||||
} |
||||
|
||||
// NewProxyBackend makes a new ProxyBackend
|
||||
func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool) *ProxyBackend { |
||||
return &ProxyBackend{ |
||||
name: name, |
||||
endpoint: endpoint, |
||||
timeout: timeout, |
||||
preferred: preferred, |
||||
client: &http.Client{ |
||||
CheckRedirect: func(_ *http.Request, _ []*http.Request) error { |
||||
return errors.New("the query-tee proxy does not follow redirects") |
||||
}, |
||||
Transport: &http.Transport{ |
||||
Proxy: http.ProxyFromEnvironment, |
||||
DialContext: (&net.Dialer{ |
||||
Timeout: 30 * time.Second, |
||||
KeepAlive: 30 * time.Second, |
||||
}).DialContext, |
||||
MaxIdleConns: 100, |
||||
MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801
|
||||
IdleConnTimeout: 90 * time.Second, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (b *ProxyBackend) ForwardRequest(orig *http.Request) (int, []byte, error) { |
||||
req, err := b.createBackendRequest(orig) |
||||
if err != nil { |
||||
return 0, nil, err |
||||
} |
||||
|
||||
return b.doBackendRequest(req) |
||||
} |
||||
|
||||
func (b *ProxyBackend) createBackendRequest(orig *http.Request) (*http.Request, error) { |
||||
req, err := http.NewRequest(orig.Method, orig.URL.String(), nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Replace the endpoint with the backend one.
|
||||
req.URL.Scheme = b.endpoint.Scheme |
||||
req.URL.Host = b.endpoint.Host |
||||
|
||||
// Prepend the endpoint path to the request path.
|
||||
req.URL.Path = path.Join(b.endpoint.Path, req.URL.Path) |
||||
|
||||
// Replace the auth:
|
||||
// - If the endpoint has user and password, use it.
|
||||
// - If the endpoint has user only, keep it and use the request password (if any).
|
||||
// - If the endpoint has no user and no password, use the request auth (if any).
|
||||
clientUser, clientPass, clientAuth := orig.BasicAuth() |
||||
endpointUser := b.endpoint.User.Username() |
||||
endpointPass, _ := b.endpoint.User.Password() |
||||
|
||||
if endpointUser != "" && endpointPass != "" { |
||||
req.SetBasicAuth(endpointUser, endpointPass) |
||||
} else if endpointUser != "" { |
||||
req.SetBasicAuth(endpointUser, clientPass) |
||||
} else if clientAuth { |
||||
req.SetBasicAuth(clientUser, clientPass) |
||||
} |
||||
|
||||
return req, nil |
||||
} |
||||
|
||||
func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, error) { |
||||
// Honor the read timeout.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.timeout) |
||||
defer cancel() |
||||
|
||||
// Execute the request.
|
||||
res, err := b.client.Do(req.WithContext(ctx)) |
||||
if err != nil { |
||||
return 0, nil, errors.Wrap(err, "executing backend request") |
||||
} |
||||
|
||||
// Read the entire response body.
|
||||
defer res.Body.Close() |
||||
body, err := ioutil.ReadAll(res.Body) |
||||
if err != nil { |
||||
return 0, nil, errors.Wrap(err, "reading backend response") |
||||
} |
||||
|
||||
return res.StatusCode, body, nil |
||||
} |
||||
@ -0,0 +1,180 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net/http" |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
|
||||
"github.com/go-kit/kit/log" |
||||
"github.com/go-kit/kit/log/level" |
||||
) |
||||
|
||||
type ResponsesComparator interface { |
||||
Compare(expected, actual []byte) error |
||||
} |
||||
|
||||
type ProxyEndpoint struct { |
||||
backends []*ProxyBackend |
||||
metrics *ProxyMetrics |
||||
logger log.Logger |
||||
comparator ResponsesComparator |
||||
|
||||
// The route name used to track metrics.
|
||||
routeName string |
||||
} |
||||
|
||||
func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint { |
||||
return &ProxyEndpoint{ |
||||
backends: backends, |
||||
routeName: routeName, |
||||
metrics: metrics, |
||||
logger: logger, |
||||
comparator: comparator, |
||||
} |
||||
} |
||||
|
||||
func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
||||
level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", r.URL.RawQuery) |
||||
|
||||
// Send the same request to all backends.
|
||||
wg := sync.WaitGroup{} |
||||
wg.Add(len(p.backends)) |
||||
resCh := make(chan *backendResponse, len(p.backends)) |
||||
|
||||
for _, b := range p.backends { |
||||
b := b |
||||
|
||||
go func() { |
||||
defer wg.Done() |
||||
|
||||
start := time.Now() |
||||
status, body, err := b.ForwardRequest(r) |
||||
elapsed := time.Since(start) |
||||
|
||||
res := &backendResponse{ |
||||
backend: b, |
||||
status: status, |
||||
body: body, |
||||
err: err, |
||||
elapsed: elapsed, |
||||
} |
||||
resCh <- res |
||||
|
||||
// Log with a level based on the backend response.
|
||||
lvl := level.Debug |
||||
if !res.succeeded() { |
||||
lvl = level.Warn |
||||
} |
||||
|
||||
lvl(p.logger).Log("msg", "Backend response", "path", r.URL.Path, "query", r.URL.RawQuery, "backend", b.name, "status", status, "elapsed", elapsed) |
||||
}() |
||||
} |
||||
|
||||
// Wait until all backend requests completed.
|
||||
wg.Wait() |
||||
close(resCh) |
||||
|
||||
// Collect all responses and track metrics for each of them.
|
||||
responses := make([]*backendResponse, 0, len(p.backends)) |
||||
for res := range resCh { |
||||
responses = append(responses, res) |
||||
|
||||
p.metrics.durationMetric.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(res.elapsed.Seconds()) |
||||
} |
||||
|
||||
// Select the response to send back to the client.
|
||||
downstreamRes := p.pickResponseForDownstream(responses) |
||||
if downstreamRes.err != nil { |
||||
http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError) |
||||
} else { |
||||
w.WriteHeader(downstreamRes.status) |
||||
if _, err := w.Write(downstreamRes.body); err != nil { |
||||
level.Warn(p.logger).Log("msg", "Unable to write response", "err", err) |
||||
} |
||||
} |
||||
|
||||
if p.comparator != nil { |
||||
go func() { |
||||
expectedResponse := responses[0] |
||||
actualResponse := responses[1] |
||||
if responses[1].backend.preferred { |
||||
expectedResponse, actualResponse = actualResponse, expectedResponse |
||||
} |
||||
|
||||
result := resultSuccess |
||||
err := p.compareResponses(expectedResponse, actualResponse) |
||||
if err != nil { |
||||
level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName, |
||||
"query", r.URL.RawQuery, "err", err) |
||||
result = resultFailed |
||||
} |
||||
|
||||
p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc() |
||||
}() |
||||
} |
||||
} |
||||
|
||||
func (p *ProxyEndpoint) pickResponseForDownstream(responses []*backendResponse) *backendResponse { |
||||
// Look for a successful response from the preferred backend.
|
||||
for _, res := range responses { |
||||
if res.backend.preferred && res.succeeded() { |
||||
return res |
||||
} |
||||
} |
||||
|
||||
// Look for any other successful response.
|
||||
for _, res := range responses { |
||||
if res.succeeded() { |
||||
return res |
||||
} |
||||
} |
||||
|
||||
// No successful response, so let's pick the first one.
|
||||
return responses[0] |
||||
} |
||||
|
||||
func (p *ProxyEndpoint) compareResponses(expectedResponse, actualResponse *backendResponse) error { |
||||
// compare response body only if we get a 200
|
||||
if expectedResponse.status != 200 { |
||||
return fmt.Errorf("skipped comparison of response because we got status code %d from preferred backend's response", expectedResponse.status) |
||||
} |
||||
|
||||
if actualResponse.status != 200 { |
||||
return fmt.Errorf("skipped comparison of response because we got status code %d from secondary backend's response", expectedResponse.status) |
||||
} |
||||
|
||||
if expectedResponse.status != actualResponse.status { |
||||
return fmt.Errorf("expected status code %d but got %d", expectedResponse.status, actualResponse.status) |
||||
} |
||||
|
||||
return p.comparator.Compare(expectedResponse.body, actualResponse.body) |
||||
} |
||||
|
||||
type backendResponse struct { |
||||
backend *ProxyBackend |
||||
status int |
||||
body []byte |
||||
err error |
||||
elapsed time.Duration |
||||
} |
||||
|
||||
func (r *backendResponse) succeeded() bool { |
||||
if r.err != nil { |
||||
return false |
||||
} |
||||
|
||||
// We consider the response successful if it's a 2xx or 4xx (but not 429).
|
||||
return (r.status >= 200 && r.status < 300) || (r.status >= 400 && r.status < 500 && r.status != 429) |
||||
} |
||||
|
||||
func (r *backendResponse) statusCode() int { |
||||
if r.err != nil || r.status <= 0 { |
||||
return 500 |
||||
} |
||||
|
||||
return r.status |
||||
} |
||||
@ -0,0 +1,35 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/weaveworks/common/instrument" |
||||
) |
||||
|
||||
const ( |
||||
resultSuccess = "success" |
||||
resultFailed = "fail" |
||||
) |
||||
|
||||
type ProxyMetrics struct { |
||||
durationMetric *prometheus.HistogramVec |
||||
responsesComparedTotal *prometheus.CounterVec |
||||
} |
||||
|
||||
func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { |
||||
m := &ProxyMetrics{ |
||||
durationMetric: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: "cortex_querytee", |
||||
Name: "request_duration_seconds", |
||||
Help: "Time (in seconds) spent serving HTTP requests.", |
||||
Buckets: instrument.DefBuckets, |
||||
}, []string{"backend", "method", "route", "status_code"}), |
||||
responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "cortex_querytee", |
||||
Name: "responses_compared_total", |
||||
Help: "Total number of responses compared per route name by result", |
||||
}, []string{"route_name", "result"}), |
||||
} |
||||
|
||||
return m |
||||
} |
||||
@ -0,0 +1,201 @@ |
||||
package querytee |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
|
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
) |
||||
|
||||
// SamplesComparatorFunc helps with comparing different types of samples coming from /api/v1/query and /api/v1/query_range routes.
|
||||
type SamplesComparatorFunc func(expected, actual json.RawMessage) error |
||||
|
||||
type SamplesResponse struct { |
||||
Status string |
||||
Data struct { |
||||
ResultType string |
||||
Result json.RawMessage |
||||
} |
||||
} |
||||
|
||||
func NewSamplesComparator() *SamplesComparator { |
||||
return &SamplesComparator{map[string]SamplesComparatorFunc{ |
||||
"matrix": compareMatrix, |
||||
"vector": compareVector, |
||||
"scalar": compareScalar, |
||||
}} |
||||
} |
||||
|
||||
type SamplesComparator struct { |
||||
sampleTypesComparator map[string]SamplesComparatorFunc |
||||
} |
||||
|
||||
// RegisterSamplesComparator helps with registering custom sample types
|
||||
func (s *SamplesComparator) RegisterSamplesType(samplesType string, comparator SamplesComparatorFunc) { |
||||
s.sampleTypesComparator[samplesType] = comparator |
||||
} |
||||
|
||||
func (s *SamplesComparator) Compare(expectedResponse, actualResponse []byte) error { |
||||
var expected, actual SamplesResponse |
||||
|
||||
err := json.Unmarshal(expectedResponse, &expected) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = json.Unmarshal(actualResponse, &actual) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if expected.Status != actual.Status { |
||||
return fmt.Errorf("expected status %s but got %s", expected.Status, actual.Status) |
||||
} |
||||
|
||||
if expected.Data.ResultType != actual.Data.ResultType { |
||||
return fmt.Errorf("expected resultType %s but got %s", expected.Data.ResultType, actual.Data.ResultType) |
||||
} |
||||
|
||||
comparator, ok := s.sampleTypesComparator[expected.Data.ResultType] |
||||
if !ok { |
||||
return fmt.Errorf("resultType %s not registered for comparison", expected.Data.ResultType) |
||||
} |
||||
|
||||
return comparator(expected.Data.Result, actual.Data.Result) |
||||
} |
||||
|
||||
func compareMatrix(expectedRaw, actualRaw json.RawMessage) error { |
||||
var expected, actual model.Matrix |
||||
|
||||
err := json.Unmarshal(expectedRaw, &expected) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = json.Unmarshal(actualRaw, &actual) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if len(expected) != len(actual) { |
||||
return fmt.Errorf("expected %d metrics but got %d", len(expected), |
||||
len(actual)) |
||||
} |
||||
|
||||
metricFingerprintToIndexMap := make(map[model.Fingerprint]int, len(expected)) |
||||
for i, actualMetric := range actual { |
||||
metricFingerprintToIndexMap[actualMetric.Metric.Fingerprint()] = i |
||||
} |
||||
|
||||
for _, expectedMetric := range expected { |
||||
actualMetricIndex, ok := metricFingerprintToIndexMap[expectedMetric.Metric.Fingerprint()] |
||||
if !ok { |
||||
return fmt.Errorf("expected metric %s missing from actual response", expectedMetric.Metric) |
||||
} |
||||
|
||||
actualMetric := actual[actualMetricIndex] |
||||
expectedMetricLen := len(expectedMetric.Values) |
||||
actualMetricLen := len(actualMetric.Values) |
||||
|
||||
if expectedMetricLen != actualMetricLen { |
||||
err := fmt.Errorf("expected %d samples for metric %s but got %d", expectedMetricLen, |
||||
expectedMetric.Metric, actualMetricLen) |
||||
if expectedMetricLen > 0 && actualMetricLen > 0 { |
||||
level.Error(util.Logger).Log("msg", err.Error(), "oldest-expected-ts", expectedMetric.Values[0].Timestamp, |
||||
"newest-expected-ts", expectedMetric.Values[expectedMetricLen-1].Timestamp, |
||||
"oldest-actual-ts", actualMetric.Values[0].Timestamp, "newest-actual-ts", actualMetric.Values[actualMetricLen-1].Timestamp) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
for i, expectedSamplePair := range expectedMetric.Values { |
||||
actualSamplePair := actualMetric.Values[i] |
||||
err := compareSamplePair(expectedSamplePair, actualSamplePair) |
||||
if err != nil { |
||||
return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func compareVector(expectedRaw, actualRaw json.RawMessage) error { |
||||
var expected, actual model.Vector |
||||
|
||||
err := json.Unmarshal(expectedRaw, &expected) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = json.Unmarshal(actualRaw, &actual) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if len(expected) != len(actual) { |
||||
return fmt.Errorf("expected %d metrics but got %d", len(expected), |
||||
len(actual)) |
||||
} |
||||
|
||||
metricFingerprintToIndexMap := make(map[model.Fingerprint]int, len(expected)) |
||||
for i, actualMetric := range actual { |
||||
metricFingerprintToIndexMap[actualMetric.Metric.Fingerprint()] = i |
||||
} |
||||
|
||||
for _, expectedMetric := range expected { |
||||
actualMetricIndex, ok := metricFingerprintToIndexMap[expectedMetric.Metric.Fingerprint()] |
||||
if !ok { |
||||
return fmt.Errorf("expected metric %s missing from actual response", expectedMetric.Metric) |
||||
} |
||||
|
||||
actualMetric := actual[actualMetricIndex] |
||||
err := compareSamplePair(model.SamplePair{ |
||||
Timestamp: expectedMetric.Timestamp, |
||||
Value: expectedMetric.Value, |
||||
}, model.SamplePair{ |
||||
Timestamp: actualMetric.Timestamp, |
||||
Value: actualMetric.Value, |
||||
}) |
||||
if err != nil { |
||||
return errors.Wrapf(err, "sample pair not matching for metric %s", expectedMetric.Metric) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func compareScalar(expectedRaw, actualRaw json.RawMessage) error { |
||||
var expected, actual model.Scalar |
||||
err := json.Unmarshal(expectedRaw, &expected) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = json.Unmarshal(actualRaw, &actual) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return compareSamplePair(model.SamplePair{ |
||||
Timestamp: expected.Timestamp, |
||||
Value: expected.Value, |
||||
}, model.SamplePair{ |
||||
Timestamp: actual.Timestamp, |
||||
Value: actual.Value, |
||||
}) |
||||
} |
||||
|
||||
func compareSamplePair(expected, actual model.SamplePair) error { |
||||
if expected.Timestamp != actual.Timestamp { |
||||
return fmt.Errorf("expected timestamp %v but got %v", expected.Timestamp, actual.Timestamp) |
||||
} |
||||
if expected.Value != actual.Value { |
||||
return fmt.Errorf("expected value %s for timestamp %v but got %s", expected.Value, expected.Timestamp, actual.Value) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
Loading…
Reference in new issue