Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/querier/queryrange/roundtrip.go

90 lines
2.9 KiB

Loki Query Frontend (#1442) * Adds frontend to Loki. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improves tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sneaky bug in entries sorting. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes the split by interval. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tweak jsonnet deployments and add a way to lint/fmt. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * lint. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * fix timezone issue. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve tests and rollback change in loghttp package. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a flaky test that might run one more goroutine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes windows build. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve tracing in the split by interval. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add test stream to proto conversion. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes flappy retry test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove err shadowing in stopQueryFrontend as it was confusing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Refactor grpc message size in the libsonnet config file. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Don't check auth header for GRPC TransferChunks. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Query frontend (#3) * frontend codec merging optimizations * codec benchmarks * removes unused bounds code in queryrange ordering * [wip] splitby uses channels instead of sub batching intervals * splitBy channel limit test * single allocation for merging entries from a single stream * skip merging loki responses when limit is already hit * removes checks for unlimited queries in queryrange * removes splitByInterval{,.interval} spans * removes interval_batch_size from jsonnet lib * moves benchmark utils to own file * renames markers -> entries * priority queue comments * Removes unused logRequest. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Sets the cache interval to the same split interval. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Missing import libsonnet for the frontend. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Frontend should not be a cluster IP. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
6 years ago
package queryrange
import (
"flag"
"net/http"
"strings"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/logql"
)
// Config is the configuration for the queryrange tripperware
type Config struct {
queryrange.Config `yaml:",inline"`
}
// RegisterFlags adds the flags required to configure this flag set.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
}
// Stopper gracefully shutdown resources created
type Stopper interface {
Stop() error
}
// NewTripperware returns a Tripperware configured with middlewares to align, split and cache requests.
func NewTripperware(cfg Config, log log.Logger, limits queryrange.Limits) (frontend.Tripperware, Stopper, error) {
metricsTripperware, cache, err := queryrange.NewTripperware(cfg.Config, log, limits, lokiCodec, queryrange.PrometheusResponseExtractor)
if err != nil {
return nil, nil, err
}
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, lokiCodec)
if err != nil {
return nil, nil, err
}
return frontend.Tripperware(func(next http.RoundTripper) http.RoundTripper {
metricRT := metricsTripperware(next)
logFilterRT := logFilterTripperware(next)
return frontend.RoundTripFunc(func(req *http.Request) (*http.Response, error) {
if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") {
return next.RoundTrip(req)
}
params := req.URL.Query()
query := params.Get("query")
expr, err := logql.ParseExpr(query)
if err != nil {
return nil, err
}
if _, ok := expr.(logql.SampleExpr); ok {
return metricRT.RoundTrip(req)
}
if logSelector, ok := expr.(logql.LogSelectorExpr); ok {
filter, err := logSelector.Filter()
if err != nil {
return nil, err
}
if filter != nil {
return logFilterRT.RoundTrip(req)
}
}
return next.RoundTrip(req)
})
}), cache, nil
}
// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests with regex.
func NewLogFilterTripperware(
cfg Config,
log log.Logger,
limits queryrange.Limits,
codec queryrange.Codec,
) (frontend.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{queryrange.LimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(cfg.SplitQueriesByInterval, limits, codec))
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry"), queryrange.NewRetryMiddleware(log, cfg.MaxRetries))
}
return frontend.Tripperware(func(next http.RoundTripper) http.RoundTripper {
if len(queryRangeMiddleware) > 0 {
return queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...)
}
return next
}), nil
}