@ -180,96 +180,125 @@ func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder
return head , bytesPerShard , nil
}
// turn a vector aggr into a wrapped+sharded variant,
// used as a subroutine in mapping
func ( m ShardMapper ) wrappedShardedVectorAggr ( expr * syntax . VectorAggregationExpr , r * downstreamRecorder ) ( * syntax . VectorAggregationExpr , uint64 , error ) {
sharded , bytesPerShard , err := m . mapSampleExpr ( expr , r )
if err != nil {
return nil , 0 , err
}
return & syntax . VectorAggregationExpr {
Left : sharded ,
Grouping : expr . Grouping ,
Params : expr . Params ,
Operation : expr . Operation ,
} , bytesPerShard , nil
}
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func ( m ShardMapper ) mapVectorAggregationExpr ( expr * syntax . VectorAggregationExpr , r * downstreamRecorder ) ( syntax . SampleExpr , uint64 , error ) {
// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if ! expr . Shardable ( ) {
subMapped , bytesPerShard , err := m . Map ( expr . Left , r )
if err != nil {
return nil , 0 , err
}
sampleExpr , ok := subMapped . ( syntax . SampleExpr )
if ! ok {
return nil , 0 , badASTMapping ( subMapped )
}
if expr . Shardable ( ) {
return & syntax . VectorAggregationExpr {
Left : sampleExpr ,
Grouping : expr . Grouping ,
Params : expr . Params ,
Operation : expr . Operation ,
} , bytesPerShard , nil
switch expr . Operation {
}
case syntax . OpTypeSum :
// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...)
return m . wrappedShardedVectorAggr ( expr , r )
switch expr . Operation {
case syntax . OpTypeSum :
// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...)
sharded , bytesPerShard , err := m . mapSampleExpr ( expr , r )
if err != nil {
return nil , 0 , err
}
return & syntax . VectorAggregationExpr {
Left : sharded ,
Grouping : expr . Grouping ,
Params : expr . Params ,
Operation : expr . Operation ,
} , bytesPerShard , nil
case syntax . OpTypeAvg :
// avg(x) -> sum(x)/count(x)
lhs , lhsBytesPerShard , err := m . mapVectorAggregationExpr ( & syntax . VectorAggregationExpr {
Left : expr . Left ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeSum ,
} , r )
if err != nil {
return nil , 0 , err
}
rhs , rhsBytesPerShard , err := m . mapVectorAggregationExpr ( & syntax . VectorAggregationExpr {
Left : expr . Left ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeCount ,
} , r )
if err != nil {
return nil , 0 , err
}
case syntax . OpTypeMin , syntax . OpTypeMax :
if syntax . ReducesLabels ( expr ) {
// skip sharding optimizations at this level. If labels are reduced,
// the same series may exist on multiple shards and must be aggregated
// together before a max|min is applied
break
}
// max(x) -> max(max(x, shard=1) ++ max(x, shard=2)...)
// min(x) -> min(min(x, shard=1) ++ min(x, shard=2)...)
return m . wrappedShardedVectorAggr ( expr , r )
case syntax . OpTypeAvg :
// avg(x) -> sum(x)/count(x), which is parallelizable
lhs , lhsBytesPerShard , err := m . mapVectorAggregationExpr ( & syntax . VectorAggregationExpr {
Left : expr . Left ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeSum ,
} , r )
if err != nil {
return nil , 0 , err
}
// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64 ( math . Max ( int ( lhsBytesPerShard ) , int ( rhsBytesPerShard ) ) )
rhs , rhsBytesPerShard , err := m . mapVectorAggregationExpr ( & syntax . VectorAggregationExpr {
Left : expr . Left ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeCount ,
} , r )
if err != nil {
return nil , 0 , err
}
return & syntax . BinOpExpr {
SampleExpr : lhs ,
RHS : rhs ,
Op : syntax . OpTypeDiv ,
} , bytesPerShard , nil
// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64 ( math . Max ( int ( lhsBytesPerShard ) , int ( rhsBytesPerShard ) ) )
return & syntax . BinOpExpr {
SampleExpr : lhs ,
RHS : rhs ,
Op : syntax . OpTypeDiv ,
} , bytesPerShard , nil
case syntax . OpTypeCount :
if syntax . ReducesLabels ( expr ) {
// skip sharding optimizations at this level. If labels are reduced,
// the same series may exist on multiple shards and must be aggregated
// together before a count is applied
break
}
case syntax . OpTypeCount :
// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...)
sharded , bytesPerShard , err := m . mapSampleExpr ( expr , r )
if err != nil {
return nil , 0 , err
}
return & syntax . VectorAggregationExpr {
Left : sharded ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeSum ,
} , bytesPerShard , nil
default :
// this should not be reachable. If an operation is shardable it should
// have an optimization listed.
level . Warn ( util_log . Logger ) . Log (
"msg" , "unexpected operation which appears shardable, ignoring" ,
"operation" , expr . Operation ,
)
exprStats , err := m . shards . GetStats ( expr )
if err != nil {
return nil , 0 , err
// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...)
sharded , bytesPerShard , err := m . mapSampleExpr ( expr , r )
if err != nil {
return nil , 0 , err
}
return & syntax . VectorAggregationExpr {
Left : sharded ,
Grouping : expr . Grouping ,
Operation : syntax . OpTypeSum ,
} , bytesPerShard , nil
default :
// this should not be reachable. If an operation is shardable it should
// have an optimization listed. Nonetheless, we log this as a warning
// and return the original expression unsharded.
level . Warn ( util_log . Logger ) . Log (
"msg" , "unexpected operation which appears shardable, ignoring" ,
"operation" , expr . Operation ,
)
exprStats , err := m . shards . GetStats ( expr )
if err != nil {
return nil , 0 , err
}
return expr , exprStats . Bytes , nil
}
return expr , exprStats . Bytes , nil
}
// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
subMapped , bytesPerShard , err := m . Map ( expr . Left , r )
if err != nil {
return nil , 0 , err
}
sampleExpr , ok := subMapped . ( syntax . SampleExpr )
if ! ok {
return nil , 0 , badASTMapping ( subMapped )
}
return & syntax . VectorAggregationExpr {
Left : sampleExpr ,
Grouping : expr . Grouping ,
Params : expr . Params ,
Operation : expr . Operation ,
} , bytesPerShard , nil
}
func ( m ShardMapper ) mapLabelReplaceExpr ( expr * syntax . LabelReplaceExpr , r * downstreamRecorder ) ( syntax . SampleExpr , uint64 , error ) {
@ -283,52 +312,77 @@ func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downs
}
func ( m ShardMapper ) mapRangeAggregationExpr ( expr * syntax . RangeAggregationExpr , r * downstreamRecorder ) ( syntax . SampleExpr , uint64 , error ) {
if hasLabelModifier ( expr ) {
// if an expr can modify labels this means multiple shards can return the same labelset.
// When this happens the merge strategy needs to be different from a simple concatenation.
// For instance for rates we need to sum data from different shards but same series.
// Since we currently support only concatenation as merge strategy, we skip those queries.
if ! expr . Shardable ( ) {
exprStats , err := m . shards . GetStats ( expr )
if err != nil {
return nil , 0 , err
}
return expr , exprStats . Bytes , nil
}
switch expr . Operation {
case syntax . OpRangeTypeCount , syntax . OpRangeTypeRate , syntax . OpRangeTypeBytesRate , syntax . OpRangeTypeBytes :
// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)...
// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)...
// same goes for bytes_rate and bytes_over_time
return m . mapSampleExpr ( expr , r )
case syntax . OpRangeTypeCount , syntax . OpRangeTypeRate , syntax . OpRangeTypeBytes , syntax . OpRangeTypeBytesRate , syntax . OpRangeTypeSum , syntax . OpRangeTypeMax , syntax . OpRangeTypeMin :
// if the expr can reduce labels, it can cause the same labelset to
// exist on separate shards and we'll need to merge the results
// accordingly. If it does not reduce labels and has no special grouping
// aggregation, we can shard it as normal via concatenation.
potentialConflict := syntax . ReducesLabels ( expr )
if ! potentialConflict && ( expr . Grouping == nil || expr . Grouping . Noop ( ) ) {
return m . mapSampleExpr ( expr , r )
}
// These functions require a different merge strategy than the default
// concatentation.
// This is because the same label sets may exist on multiple shards when label-reducing parsing is applied or when
// grouping by some subset of the labels. In this case, the resulting vector may have multiple values for the same
// series and we need to combine them appropriately given a particular operation.
mergeMap := map [ string ] string {
// all these may be summed
syntax . OpRangeTypeCount : syntax . OpTypeSum ,
syntax . OpRangeTypeRate : syntax . OpTypeSum ,
syntax . OpRangeTypeBytes : syntax . OpTypeSum ,
syntax . OpRangeTypeBytesRate : syntax . OpTypeSum ,
syntax . OpRangeTypeSum : syntax . OpTypeSum ,
// min & max require taking the min|max of the shards
syntax . OpRangeTypeMin : syntax . OpTypeMin ,
syntax . OpRangeTypeMax : syntax . OpTypeMax ,
}
// range aggregation groupings default to `without ()` behavior
// so we explicitly set the wrapping vector aggregation to this
// for parity when it's not explicitly set
grouping := expr . Grouping
if grouping == nil {
grouping = & syntax . Grouping { Without : true }
}
mapped , bytes , err := m . mapSampleExpr ( expr , r )
// max_over_time(_) -> max without() (max_over_time(_) ++ max_over_time(_)...)
// max_over_time(_) by (foo) -> max by (foo) (max_over_time(_) by (foo) ++ max_over_time(_) by (foo)...)
merger , ok := mergeMap [ expr . Operation ]
if ! ok {
return nil , 0 , fmt . Errorf (
"error while finding merge operation for %s" , expr . Operation ,
)
}
return & syntax . VectorAggregationExpr {
Left : mapped ,
Grouping : grouping ,
Operation : merger ,
} , bytes , err
default :
// This part of the query is not shardable, so the bytesPerShard is the bytes for all the log matchers in expr
// don't shard if there's not an appropriate optimization
exprStats , err := m . shards . GetStats ( expr )
if err != nil {
return nil , 0 , err
}
return expr , exprStats . Bytes , nil
}
}
// hasLabelModifier tells if an expression contains pipelines that can modify stream labels
// parsers introduce new labels but does not alter original one for instance.
func hasLabelModifier ( expr * syntax . RangeAggregationExpr ) bool {
switch ex := expr . Left . Left . ( type ) {
case * syntax . MatchersExpr :
return false
case * syntax . PipelineExpr :
for _ , p := range ex . MultiStages {
if _ , ok := p . ( * syntax . LabelFmtExpr ) ; ok {
return true
}
}
}
return false
}
func badASTMapping ( got syntax . Expr ) error {
return fmt . Errorf ( "bad AST mapping: expected SampleExpr, but got (%T)" , got )
}