@ -89,6 +89,8 @@ type Query interface {
// query implements the Query interface.
type query struct {
// Underlying data provider.
queryable storage . Queryable
// The original query string.
q string
// Statement of the parsed query.
@ -150,26 +152,18 @@ func contextDone(ctx context.Context, env string) error {
// Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier.
type Engine struct {
// A Querier constructor against an underlying storage.
queryable Queryable
metrics * engineMetrics
// The gate limiting the maximum number of concurrent and waiting queries.
logger log . Logger
metrics * engineMetrics
timeout time . Duration
gate * queryGate
options * EngineOptions
logger log . Logger
}
// Queryable allows opening a storage querier.
type Queryable interface {
Querier ( ctx context . Context , mint , maxt int64 ) ( storage . Querier , error )
}
// NewEngine returns a new engine.
func NewEngine ( queryable Queryable , o * EngineOptions ) * Engine {
if o == nil {
o = DefaultEngineOptions
func NewEngine ( logger log . Logger , reg prometheus . Registerer , maxConcurrent int , timeout time . Duration ) * Engine {
if logger == nil {
logger = log . NewNopLogger ( )
}
metrics := & engineMetrics {
currentQueries : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
@ -212,10 +206,10 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
ConstLabels : prometheus . Labels { "slice" : "result_sort" } ,
} ) ,
}
metrics . maxConcurrentQueries . Set ( float64 ( o . MaxConcurrentQueries ) )
metrics . maxConcurrentQueries . Set ( float64 ( maxConcurrent ) )
if o . Metrics != nil {
o . Metrics . MustRegister (
if reg != nil {
reg . MustRegister (
metrics . currentQueries ,
metrics . maxConcurrentQueries ,
metrics . queryInnerEval ,
@ -225,36 +219,20 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
)
}
return & Engine {
queryable : queryable ,
gate : newQueryGate ( o . MaxConcurrentQueries ) ,
options : o ,
logger : o . Logger ,
metrics : metrics ,
gate : newQueryGate ( maxConcurrent ) ,
timeout : timeout ,
logger : logger ,
metrics : metrics ,
}
}
// EngineOptions contains configuration parameters for an Engine.
type EngineOptions struct {
MaxConcurrentQueries int
Timeout time . Duration
Logger log . Logger
Metrics prometheus . Registerer
}
// DefaultEngineOptions are the default engine options.
var DefaultEngineOptions = & EngineOptions {
MaxConcurrentQueries : 20 ,
Timeout : 2 * time . Minute ,
Logger : log . NewNopLogger ( ) ,
}
// NewInstantQuery returns an evaluation query for the given expression at the given time.
func ( ng * Engine ) NewInstantQuery ( qs string , ts time . Time ) ( Query , error ) {
func ( ng * Engine ) NewInstantQuery ( q storage . Queryable , qs string , ts time . Time ) ( Query , error ) {
expr , err := ParseExpr ( qs )
if err != nil {
return nil , err
}
qry := ng . newQuery ( expr , ts , ts , 0 )
qry := ng . newQuery ( q , expr , ts , ts , 0 )
qry . q = qs
return qry , nil
@ -262,7 +240,7 @@ func (ng *Engine) NewInstantQuery(qs string, ts time.Time) (Query, error) {
// NewRangeQuery returns an evaluation query for the given time range and with
// the resolution set by the interval.
func ( ng * Engine ) NewRangeQuery ( qs string , start , end time . Time , interval time . Duration ) ( Query , error ) {
func ( ng * Engine ) NewRangeQuery ( q storage . Queryable , q s string , start , end time . Time , interval time . Duration ) ( Query , error ) {
expr , err := ParseExpr ( qs )
if err != nil {
return nil , err
@ -270,13 +248,13 @@ func (ng *Engine) NewRangeQuery(qs string, start, end time.Time, interval time.D
if expr . Type ( ) != ValueTypeVector && expr . Type ( ) != ValueTypeScalar {
return nil , fmt . Errorf ( "invalid expression type %q for range query, must be Scalar or instant Vector" , documentedType ( expr . Type ( ) ) )
}
qry := ng . newQuery ( expr , start , end , interval )
qry := ng . newQuery ( q , expr , start , end , interval )
qry . q = qs
return qry , nil
}
func ( ng * Engine ) newQuery ( expr Expr , start , end time . Time , interval time . Duration ) * query {
func ( ng * Engine ) newQuery ( q storage . Queryable , expr Expr , start , end time . Time , interval time . Duration ) * query {
es := & EvalStmt {
Expr : expr ,
Start : start ,
@ -284,9 +262,10 @@ func (ng *Engine) newQuery(expr Expr, start, end time.Time, interval time.Durati
Interval : interval ,
}
qry := & query {
stmt : es ,
ng : ng ,
stats : stats . NewTimerGroup ( ) ,
stmt : es ,
ng : ng ,
stats : stats . NewTimerGroup ( ) ,
queryable : q ,
}
return qry
}
@ -316,7 +295,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
ng . metrics . currentQueries . Inc ( )
defer ng . metrics . currentQueries . Dec ( )
ctx , cancel := context . WithTimeout ( ctx , ng . op tions . T imeout)
ctx , cancel := context . WithTimeout ( ctx , ng . timeout )
q . cancel = cancel
execTimer := q . stats . GetTimer ( stats . ExecTotalTime ) . Start ( )
@ -363,9 +342,8 @@ func durationMilliseconds(d time.Duration) int64 {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func ( ng * Engine ) execEvalStmt ( ctx context . Context , query * query , s * EvalStmt ) ( Value , error ) {
prepareTimer := query . stats . GetTimer ( stats . QueryPreparationTime ) . Start ( )
querier , err := ng . populateIterators ( ctx , s )
querier , err := ng . populateIterators ( ctx , query . queryable , s )
prepareTimer . Stop ( )
ng . metrics . queryPrepareTime . Observe ( prepareTimer . ElapsedTime ( ) . Seconds ( ) )
@ -489,10 +467,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return mat , nil
}
func ( ng * Engine ) populateIterators ( ctx context . Context , s * EvalStmt ) ( storage . Querier , error ) {
func ( ng * Engine ) populateIterators ( ctx context . Context , q storage . Queryable , s * EvalStmt ) ( storage . Querier , error ) {
var maxOffset time . Duration
Inspect ( s . Expr , func ( node Node ) bool {
Inspect ( s . Expr , func ( node Node , _ [ ] Node ) bool {
switch n := node . ( type ) {
case * VectorSelector :
if maxOffset < LookbackDelta {
@ -514,15 +491,21 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
mint := s . Start . Add ( - maxOffset )
querier , err := ng . queryable . Querier ( ctx , timestamp . FromTime ( mint ) , timestamp . FromTime ( s . End ) )
querier , err := q . Querier ( ctx , timestamp . FromTime ( mint ) , timestamp . FromTime ( s . End ) )
if err != nil {
return nil , err
}
Inspect ( s . Expr , func ( node Node ) bool {
Inspect ( s . Expr , func ( node Node , path [ ] Node ) bool {
params := & storage . SelectParams {
Step : int64 ( s . Interval / time . Millisecond ) ,
}
switch n := node . ( type ) {
case * VectorSelector :
set , err := querier . Select ( n . LabelMatchers ... )
params . Func = extractFuncFromPath ( path )
set , err := querier . Select ( params , n . LabelMatchers ... )
if err != nil {
level . Error ( ng . logger ) . Log ( "msg" , "error selecting series set" , "err" , err )
return false
@ -539,7 +522,9 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
}
case * MatrixSelector :
set , err := querier . Select ( n . LabelMatchers ... )
params . Func = extractFuncFromPath ( path )
set , err := querier . Select ( params , n . LabelMatchers ... )
if err != nil {
level . Error ( ng . logger ) . Log ( "msg" , "error selecting series set" , "err" , err )
return false
@ -559,6 +544,25 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
return querier , err
}
// extractFuncFromPath walks up the path and searches for the first instance of
// a function or aggregation.
func extractFuncFromPath ( p [ ] Node ) string {
if len ( p ) == 0 {
return ""
}
switch n := p [ len ( p ) - 1 ] . ( type ) {
case * AggregateExpr :
return n . Op . String ( )
case * Call :
return n . Func . Name
case * BinaryExpr :
// If we hit a binary expression we terminate since we only care about functions
// or aggregations over a single metric.
return ""
}
return extractFuncFromPath ( p [ : len ( p ) - 1 ] )
}
func expandSeriesSet ( it storage . SeriesSet ) ( res [ ] storage . Series , err error ) {
for it . Next ( ) {
res = append ( res , it . At ( ) )