Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/logql/shardmapper.go

304 lines
8.1 KiB

package logql
import (
"fmt"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
util_log "github.com/grafana/loki/pkg/util/log"
)
type ShardResolver interface {
Shards(expr syntax.Expr) (int, error)
}
type ConstantShards int
func (s ConstantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil }
type ShardMapper struct {
shards ShardResolver
metrics *MapperMetrics
}
func NewShardMapper(resolver ShardResolver, metrics *MapperMetrics) ShardMapper {
return ShardMapper{
shards: resolver,
metrics: metrics,
}
}
func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics {
return newMapperMetrics(registerer, "shard")
}
func (m ShardMapper) Parse(query string) (noop bool, expr syntax.Expr, err error) {
parsed, err := syntax.ParseExpr(query)
if err != nil {
return false, nil, err
}
recorder := m.metrics.downstreamRecorder()
mapped, err := m.Map(parsed, recorder)
if err != nil {
m.metrics.ParsedQueries.WithLabelValues(FailureKey).Inc()
return false, nil, err
}
originalStr := parsed.String()
mappedStr := mapped.String()
noop = originalStr == mappedStr
if noop {
m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc()
} else {
m.metrics.ParsedQueries.WithLabelValues(SuccessKey).Inc()
}
recorder.Finish() // only record metrics for successful mappings
return noop, mapped, err
}
func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := syntax.Clone(expr)
if err != nil {
return nil, err
}
switch e := expr.(type) {
case *syntax.LiteralExpr:
return e, nil
case *syntax.MatchersExpr, *syntax.PipelineExpr:
return m.mapLogSelectorExpr(e.(syntax.LogSelectorExpr), r)
case *syntax.VectorAggregationExpr:
return m.mapVectorAggregationExpr(e, r)
case *syntax.LabelReplaceExpr:
return m.mapLabelReplaceExpr(e, r)
case *syntax.RangeAggregationExpr:
return m.mapRangeAggregationExpr(e, r)
case *syntax.BinOpExpr:
lhsMapped, err := m.Map(e.SampleExpr, r)
if err != nil {
return nil, err
}
rhsMapped, err := m.Map(e.RHS, r)
if err != nil {
return nil, err
}
lhsSampleExpr, ok := lhsMapped.(syntax.SampleExpr)
if !ok {
return nil, badASTMapping(lhsMapped)
}
rhsSampleExpr, ok := rhsMapped.(syntax.SampleExpr)
if !ok {
return nil, badASTMapping(rhsMapped)
}
e.SampleExpr = lhsSampleExpr
e.RHS = rhsSampleExpr
return e, nil
default:
return nil, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}
}
func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstreamRecorder) (syntax.LogSelectorExpr, error) {
var head *ConcatLogSelectorExpr
shards, err := m.shards.Shards(expr)
if err != nil {
return nil, err
}
if shards == 0 {
return &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
shard: nil,
LogSelectorExpr: expr,
},
}, nil
}
for i := shards - 1; i >= 0; i-- {
head = &ConcatLogSelectorExpr{
DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: i,
Of: shards,
},
LogSelectorExpr: expr,
},
next: head,
}
}
r.Add(shards, StreamsKey)
return head, nil
}
func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder) (syntax.SampleExpr, error) {
var head *ConcatSampleExpr
shards, err := m.shards.Shards(expr)
if err != nil {
return nil, err
}
if shards == 0 {
return &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: nil,
SampleExpr: expr,
},
}, nil
}
for i := shards - 1; i >= 0; i-- {
head = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: i,
Of: shards,
},
SampleExpr: expr,
},
next: head,
}
}
r.Add(shards, MetricsKey)
return head, nil
}
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, error) {
// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if !expr.Shardable() {
subMapped, err := m.Map(expr.Left, r)
if err != nil {
return nil, err
}
sampleExpr, ok := subMapped.(syntax.SampleExpr)
if !ok {
return nil, badASTMapping(subMapped)
}
return &syntax.VectorAggregationExpr{
Left: sampleExpr,
Grouping: expr.Grouping,
Params: expr.Params,
Operation: expr.Operation,
}, nil
}
switch expr.Operation {
case syntax.OpTypeSum:
// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...)
sharded, err := m.mapSampleExpr(expr, r)
if err != nil {
return nil, err
}
return &syntax.VectorAggregationExpr{
Left: sharded,
Grouping: expr.Grouping,
Params: expr.Params,
Operation: expr.Operation,
}, nil
case syntax.OpTypeAvg:
// avg(x) -> sum(x)/count(x)
lhs, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
if err != nil {
return nil, err
}
rhs, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
}, r)
if err != nil {
return nil, err
}
return &syntax.BinOpExpr{
SampleExpr: lhs,
RHS: rhs,
Op: syntax.OpTypeDiv,
}, nil
case syntax.OpTypeCount:
// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...)
sharded, err := m.mapSampleExpr(expr, r)
if err != nil {
return nil, err
}
return &syntax.VectorAggregationExpr{
Left: sharded,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, nil
default:
// this should not be reachable. If an operation is shardable it should
// have an optimization listed.
level.Warn(util_log.Logger).Log(
"msg", "unexpected operation which appears shardable, ignoring",
"operation", expr.Operation,
)
return expr, nil
}
}
func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder) (syntax.SampleExpr, error) {
subMapped, err := m.Map(expr.Left, r)
if err != nil {
return nil, err
}
cpy := *expr
cpy.Left = subMapped.(syntax.SampleExpr)
return &cpy, nil
}
func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, error) {
if hasLabelModifier(expr) {
// if an expr can modify labels this means multiple shards can return the same labelset.
// When this happens the merge strategy needs to be different from a simple concatenation.
// For instance for rates we need to sum data from different shards but same series.
// Since we currently support only concatenation as merge strategy, we skip those queries.
return expr, nil
}
switch expr.Operation {
case syntax.OpRangeTypeCount, syntax.OpRangeTypeRate, syntax.OpRangeTypeBytesRate, syntax.OpRangeTypeBytes:
// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)...
// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)...
// same goes for bytes_rate and bytes_over_time
return m.mapSampleExpr(expr, r)
default:
return expr, nil
}
}
// hasLabelModifier tells if an expression contains pipelines that can modify stream labels
// parsers introduce new labels but does not alter original one for instance.
func hasLabelModifier(expr *syntax.RangeAggregationExpr) bool {
switch ex := expr.Left.Left.(type) {
case *syntax.MatchersExpr:
return false
case *syntax.PipelineExpr:
for _, p := range ex.MultiStages {
if _, ok := p.(*syntax.LabelFmtExpr); ok {
return true
}
}
}
return false
}
func badASTMapping(got syntax.Expr) error {
return fmt.Errorf("bad AST mapping: expected SampleExpr, but got (%T)", got)
}