chore: add multi-variant query support to LogQL (#16196)

pull/16369/head
Trevor Whitney 11 months ago committed by GitHub
parent 8e1c19a0ce
commit 417d0a5252
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/logql/engine.go
  2. 171
      pkg/logql/syntax/ast.go
  3. 125
      pkg/logql/syntax/ast_test.go
  4. 13
      pkg/logql/syntax/clone.go
  5. 9
      pkg/logql/syntax/clone_test.go
  6. 4
      pkg/logql/syntax/lex.go
  7. 50
      pkg/logql/syntax/lex_test.go
  8. 18
      pkg/logql/syntax/parser.go
  9. 104
      pkg/logql/syntax/parser_test.go
  10. 58
      pkg/logql/syntax/serialize.go
  11. 9
      pkg/logql/syntax/serialize_test.go
  12. 16
      pkg/logql/syntax/syntax.y
  13. 968
      pkg/logql/syntax/syntax.y.go
  14. 14
      pkg/logql/syntax/test_utils.go
  15. 22
      pkg/logql/syntax/visit.go
  16. 24
      pkg/logql/syntax/visit_test.go
  17. 15
      pkg/logql/syntax/walk_test.go
  18. 13
      pkg/logqlmodel/error.go
  19. 19
      pkg/querier/plan/plan_test.go
  20. 104
      pkg/querier/queryrange/roundtrip.go
  21. 1
      pkg/querier/queryrange/roundtrip_test.go
  22. 2
      pkg/util/server/error.go

@ -323,6 +323,8 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
defer util.LogErrorWithContext(ctx, "closing iterator", itr.Close)
streams, err := readStreams(itr, q.params.Limit(), q.params.Direction(), q.params.Interval())
return streams, err
case syntax.VariantsExpr:
return nil, logqlmodel.ErrVariantsDisabled
default:
return nil, fmt.Errorf("unexpected type (%T): cannot evaluate", e)
}

@ -66,6 +66,7 @@ func (LogfmtExpressionParserExpr) isExpr() {}
func (LogRangeExpr) isExpr() {}
func (OffsetExpr) isExpr() {}
func (UnwrapExpr) isExpr() {}
func (MultiVariantExpr) isExpr() {}
// LogSelectorExpr is a expression filtering and returning logs.
type LogSelectorExpr interface {
@ -1318,6 +1319,10 @@ const (
// probabilistic aggregations
OpTypeApproxTopK = "approx_topk"
// variants
OpVariants = "variants"
VariantsOf = "of"
)
func IsComparisonOperator(op string) bool {
@ -2412,3 +2417,169 @@ func groupingReducesLabels(grp *Grouping) bool {
return false
}
// VariantsExpr is a LogQL expression that can produce multiple streams, defined by a set of variants,
// over a single log selector.
//
//sumtype:decl
type VariantsExpr interface {
LogRange() *LogRangeExpr
Matchers() []*labels.Matcher
Variants() []SampleExpr
SetVariant(i int, e SampleExpr) error
Interval() time.Duration
Offset() time.Duration
Extractors() ([]SampleExtractor, error)
Expr
}
type MultiVariantExpr struct {
logRange *LogRangeExpr
variants []SampleExpr
err error
}
func NewMultiVariantExpr(
logRange *LogRangeExpr,
variants []SampleExpr,
) MultiVariantExpr {
return MultiVariantExpr{
logRange: logRange,
variants: variants,
}
}
func (m *MultiVariantExpr) LogRange() *LogRangeExpr {
return m.logRange
}
func (m *MultiVariantExpr) SetLogSelector(e *LogRangeExpr) {
m.logRange = e
}
func (m *MultiVariantExpr) Matchers() []*labels.Matcher {
return m.logRange.Left.Matchers()
}
func (m *MultiVariantExpr) Interval() time.Duration {
return m.logRange.Interval
}
func (m *MultiVariantExpr) Offset() time.Duration {
return m.logRange.Offset
}
func (m *MultiVariantExpr) Variants() []SampleExpr {
return m.variants
}
func (m *MultiVariantExpr) AddVariant(v SampleExpr) {
m.variants = append(m.variants, v)
}
func (m *MultiVariantExpr) SetVariant(i int, v SampleExpr) error {
if i >= len(m.variants) {
return fmt.Errorf("variant index out of range")
}
m.variants[i] = v
return nil
}
func (m *MultiVariantExpr) Shardable(topLevel bool) bool {
if !m.logRange.Shardable(topLevel) {
return false
}
for _, v := range m.variants {
if !v.Shardable(topLevel) {
return false
}
}
return true
}
func (m *MultiVariantExpr) Walk(f WalkFn) {
f(m)
if m.logRange != nil {
m.logRange.Walk(f)
}
for _, v := range m.variants {
v.Walk(f)
}
}
func (m *MultiVariantExpr) String() string {
var sb strings.Builder
sb.WriteString(OpVariants)
sb.WriteString("(")
for i, v := range m.variants {
sb.WriteString(v.String())
if i+1 != len(m.variants) {
sb.WriteString(", ")
}
}
sb.WriteString(") ")
sb.WriteString(VariantsOf)
sb.WriteString(" (")
sb.WriteString(m.logRange.String())
sb.WriteString(")")
return sb.String()
}
func (m *MultiVariantExpr) Accept(v RootVisitor) {
v.VisitVariants(m)
}
// Pretty prettyfies any LogQL expression at given `level` of the whole LogQL query.
func (m *MultiVariantExpr) Pretty(level int) string {
s := Indent(level)
s += OpVariants + "(\n"
variants := make([]string, 0, len(m.variants))
for _, v := range m.variants {
variants = append(variants, v.Pretty(level+1))
}
for i, v := range variants {
s += v
// LogQL doesn't allow `,` at the end of last argument.
if i < len(variants)-1 {
s += ","
}
s += "\n"
}
s += Indent(level) + ") of (\n"
s += m.logRange.Pretty(level + 1)
s += Indent(level) + "\n)"
return s
}
func (m *MultiVariantExpr) Extractors() ([]log.SampleExtractor, error) {
extractors := make([]log.SampleExtractor, 0, len(m.variants))
for _, v := range m.variants {
e, err := v.Extractor()
if err != nil {
return nil, err
}
extractors = append(extractors, e)
}
return extractors, nil
}
func newVariantsExpr(variants []SampleExpr, logRange *LogRangeExpr) VariantsExpr {
return &MultiVariantExpr{
variants: variants,
logRange: logRange,
}
}

@ -2,6 +2,7 @@ package syntax
import (
"fmt"
"strings"
"testing"
"time"
@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logqlmodel"
)
var labelBar, _ = ParseLabels("{app=\"bar\"}")
@ -1157,3 +1159,126 @@ func TestCombineFilters(t *testing.T) {
}
}
}
func Test_VariantsExpr_String(t *testing.T) {
t.Parallel()
tests := []struct {
expr string
}{
{`variants(count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`},
{
`variants(count_over_time({baz="qux", foo=~"bar"}[5m]), bytes_over_time({baz="qux", foo=~"bar"}[5m])) of ({baz="qux", foo=~"bar"} | logfmt | this = "that"[5m])`,
},
{
`variants(count_over_time({baz="qux", foo!="bar"}[5m]),rate({baz="qux", foo!="bar"}[5m])) of ({baz="qux", foo!="bar"} |= "that" [5m])`,
},
{
`variants(sum by (app) (count_over_time({baz="qux", foo!="bar"}[5m])),rate({baz="qux", foo!="bar"}[5m])) of ({baz="qux", foo!="bar"} |= "that" [5m])`,
},
}
for _, tt := range tests {
t.Run(tt.expr, func(t *testing.T) {
t.Parallel()
expr, err := ParseExpr(tt.expr)
require.NoError(t, err)
expr2, err := ParseExpr(expr.String())
require.Nil(t, err)
AssertExpressions(t, expr, expr2)
})
}
}
func Test_VariantsExpr_Pretty(t *testing.T) {
tests := []struct {
expr string
pretty string
}{
{`variants(count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`, `
variants(
count_over_time({foo="bar"}[5m])
) of (
{foo="bar"} [5m]
)`},
{
`variants(count_over_time({baz="qux", foo=~"bar"}[5m]), bytes_over_time({baz="qux", foo=~"bar"}[5m])) of ({baz="qux", foo=~"bar"} | logfmt | this = "that"[5m])`,
`variants(
count_over_time({baz="qux", foo=~"bar"}[5m]),
bytes_over_time({baz="qux", foo=~"bar"}[5m])
) of (
{baz="qux", foo=~"bar"} | logfmt | this="that" [5m]
)`,
},
}
for _, tt := range tests {
t.Run(tt.expr, func(t *testing.T) {
t.Parallel()
expr, err := ParseExpr(tt.expr)
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(tt.pretty), strings.TrimSpace(expr.Pretty(0)))
})
}
}
func Test_MultiVariantExpr_Extractors(t *testing.T) {
emptyExpr := &MultiVariantExpr{
variants: []SampleExpr{},
logRange: &LogRangeExpr{},
}
validAgg := &RangeAggregationExpr{
Operation: OpRangeTypeCount,
Left: &LogRangeExpr{
Interval: time.Second,
Left: &MatchersExpr{
Mts: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
}
errorAgg := &RangeAggregationExpr{
err: logqlmodel.NewParseError("test error", 0, 0),
}
t.Run("should return empty slice for no variants", func(t *testing.T) {
extractors, err := emptyExpr.Extractors()
require.NoError(t, err)
require.Empty(t, extractors)
})
t.Run("should return extractors for all variants", func(t *testing.T) {
expr := &MultiVariantExpr{
variants: []SampleExpr{validAgg, validAgg}, // Two identical variants for simplicity
logRange: &LogRangeExpr{},
}
extractors, err := expr.Extractors()
require.NoError(t, err)
require.Len(t, extractors, 2)
})
t.Run("should propagate extractor errors", func(t *testing.T) {
expr := &MultiVariantExpr{
variants: []SampleExpr{errorAgg},
logRange: &LogRangeExpr{},
}
extractors, err := expr.Extractors()
require.Error(t, err)
require.Nil(t, extractors)
})
t.Run("should handle mixed valid and invalid variants", func(t *testing.T) {
expr := &MultiVariantExpr{
variants: []SampleExpr{validAgg, errorAgg},
logRange: &LogRangeExpr{},
}
extractors, err := expr.Extractors()
require.Error(t, err)
require.Nil(t, extractors)
})
}

@ -300,3 +300,16 @@ func (v *cloneVisitor) VisitLogfmtParser(e *LogfmtParserExpr) {
KeepEmpty: e.KeepEmpty,
}
}
func (v *cloneVisitor) VisitVariants(e *MultiVariantExpr) {
copied := &MultiVariantExpr{
logRange: MustClone[*LogRangeExpr](e.logRange),
variants: make([]SampleExpr, len(e.variants)),
}
for i, v := range e.variants {
copied.variants[i] = MustClone[SampleExpr](v)
}
v.cloned = copied
}

@ -64,6 +64,15 @@ func TestClone(t *testing.T) {
"true filter": {
query: `{ foo = "bar" } | foo =~".*"`,
},
"multiple variants": {
query: `variants(bytes_over_time({foo="bar"}[5m]), count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
},
"multiple variants with aggregation": {
query: `variants(sum by (app) (bytes_over_time({foo="bar"}[5m])), count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
},
"multiple variants with filters": {
query: `variants(bytes_over_time({foo="bar"}[5m]), count_over_time({foo="bar"}[5m])) of ({foo="bar"} | logfmt[5m])`,
},
}
for name, test := range tests {

@ -80,6 +80,10 @@ var tokens = map[string]int{
// keep labels
OpKeep: KEEP,
// variants
OpVariants: VARIANTS,
VariantsOf: OF,
}
var parserFlags = map[string]struct{}{

@ -204,3 +204,53 @@ func Test_parseDuration(t *testing.T) {
require.Equal(t, tc.expected, actual)
}
}
func TestLex_Variants(t *testing.T) {
for _, tc := range []struct {
input string
expected []int
}{
{
`variants(count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
[]int{VARIANTS, OPEN_PARENTHESIS, COUNT_OVER_TIME, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, CLOSE_PARENTHESIS, OF,
OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS},
},
{
`variants(bytes_over_time({foo="bar"}[5m]), count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
[]int{VARIANTS, OPEN_PARENTHESIS, BYTES_OVER_TIME, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, COMMA,
COUNT_OVER_TIME, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, CLOSE_PARENTHESIS,
OF, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS},
},
{
`variants(count_over_time({foo="bar"}[5m]), rate({foo="bar"}[5m])) of ({foo="bar"} | logfmt | number="42"[5m])`,
[]int{VARIANTS, OPEN_PARENTHESIS, COUNT_OVER_TIME, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, COMMA,
RATE, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, CLOSE_PARENTHESIS, OF,
OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, PIPE, LOGFMT, PIPE, IDENTIFIER, EQ, STRING, RANGE, CLOSE_PARENTHESIS},
},
{
`variants(sum by (app) (count_over_time({foo="bar"}[5m])), rate({foo="bar"}[5m])) of ({foo="bar"} | logfmt | number="42"[5m])`,
[]int{VARIANTS, OPEN_PARENTHESIS, SUM, BY, OPEN_PARENTHESIS, IDENTIFIER, CLOSE_PARENTHESIS, OPEN_PARENTHESIS, COUNT_OVER_TIME, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, CLOSE_PARENTHESIS, COMMA,
RATE, OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, RANGE, CLOSE_PARENTHESIS, CLOSE_PARENTHESIS, OF,
OPEN_PARENTHESIS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE, PIPE, LOGFMT, PIPE, IDENTIFIER, EQ, STRING, RANGE, CLOSE_PARENTHESIS},
},
} {
t.Run(tc.input, func(t *testing.T) {
actual := []int{}
l := lexer{
Scanner: Scanner{
Mode: scanner.SkipComments | scanner.ScanStrings,
},
}
l.Init(strings.NewReader(tc.input))
var lval syntaxSymType
for {
tok := l.Lex(&lval)
if tok == 0 {
break
}
actual = append(actual, tok)
}
require.Equal(t, tc.expected, actual)
})
}
}

@ -119,11 +119,29 @@ func validateExpr(expr Expr) error {
return validateSampleExpr(e)
case LogSelectorExpr:
return validateLogSelectorExpression(e)
case VariantsExpr:
return validateVariantsExpr(e)
default:
return logqlmodel.NewParseError(fmt.Sprintf("unexpected expression type: %v", e), 0, 0)
}
}
func validateVariantsExpr(e VariantsExpr) error {
err := validateLogSelectorExpression(e.LogRange().Left)
if err != nil {
return err
}
for _, variant := range e.Variants() {
err = validateSampleExpr(variant)
if err != nil {
return err
}
}
return nil
}
// validateMatchers checks whether a query would touch all the streams in the query range or uses at least one matcher to select specific streams.
func validateMatchers(matchers []*labels.Matcher) error {
_, matchers = util.SplitFiltersAndMatchers(matchers)

@ -3236,6 +3236,110 @@ var ParseTestCases = []struct {
},
},
},
{
in: `variants(count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
exp: &MultiVariantExpr{
logRange: &LogRangeExpr{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
{
Name: "foo",
Value: "bar",
Type: labels.MatchEqual,
},
},
},
Interval: 5 * time.Minute,
Offset: 0,
Unwrap: nil,
},
variants: []SampleExpr{
&RangeAggregationExpr{
Left: &LogRangeExpr{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
{
Name: "foo",
Value: "bar",
Type: labels.MatchEqual,
},
},
},
Interval: 5 * time.Minute,
Offset: 0,
Unwrap: nil,
},
Operation: OpRangeTypeCount,
Params: new(float64),
Grouping: &Grouping{},
err: nil,
},
},
},
err: nil,
},
{
in: `variants(count_over_time({foo="bar"}[5m]), rate({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
exp: &MultiVariantExpr{
logRange: &LogRangeExpr{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
{
Name: "foo",
Value: "bar",
Type: labels.MatchEqual,
},
},
},
Interval: 5 * time.Minute,
Offset: 0,
Unwrap: nil,
},
variants: []SampleExpr{
&RangeAggregationExpr{
Left: &LogRangeExpr{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
{
Name: "foo",
Value: "bar",
Type: labels.MatchEqual,
},
},
},
Interval: 5 * time.Minute,
Offset: 0,
Unwrap: nil,
},
Operation: OpRangeTypeCount,
Params: new(float64),
Grouping: &Grouping{},
err: nil,
},
&RangeAggregationExpr{
Left: &LogRangeExpr{
Left: &MatchersExpr{
Mts: []*labels.Matcher{
{
Name: "foo",
Value: "bar",
Type: labels.MatchEqual,
},
},
},
Interval: 5 * time.Minute,
Offset: 0,
Unwrap: nil,
},
Operation: OpRangeTypeRate,
Params: new(float64),
Grouping: &Grouping{},
err: nil,
},
},
},
err: nil,
},
}
func TestParse(t *testing.T) {

@ -77,6 +77,8 @@ const (
VectorAgg = "vector_agg"
VectorMatchingField = "vector_matching"
Without = "without"
Variants = "variants"
Of = "of"
)
func DecodeJSON(raw string) (Expr, error) {
@ -98,6 +100,8 @@ func DecodeJSON(raw string) (Expr, error) {
return decodeLabelReplace(iter)
case LogSelector:
return decodeLogSelector(iter)
case Variants:
return decodeVariants(iter)
default:
return nil, fmt.Errorf("unknown expression type: %s", key)
}
@ -304,6 +308,33 @@ func (v *JSONSerializer) VisitPipeline(e *PipelineExpr) {
v.Flush()
}
func (v *JSONSerializer) VisitVariants(e *MultiVariantExpr) {
v.WriteObjectStart()
v.WriteObjectField(Variants)
v.WriteObjectStart()
v.WriteObjectField(LogSelector)
// Serialize log range as string.
v.VisitLogRange(e.LogRange())
v.WriteMore()
v.WriteObjectField(Variants)
v.WriteArrayStart()
for i, variant := range e.Variants() {
if i > 0 {
v.WriteMore()
}
variant.Accept(v)
}
v.WriteArrayEnd()
v.WriteObjectEnd()
v.WriteObjectEnd()
v.Flush()
}
// Below are StageExpr visitors that we are skipping since a pipeline is
// serialized as a string.
func (*JSONSerializer) VisitDecolorize(*DecolorizeExpr) {}
@ -937,3 +968,30 @@ func decodeMatchers(iter *jsoniter.Iterator) (LogSelectorExpr, error) {
func decodePipeline(iter *jsoniter.Iterator) (LogSelectorExpr, error) {
return decodeLogSelector(iter)
}
func decodeVariants(iter *jsoniter.Iterator) (VariantsExpr, error) {
var e MultiVariantExpr
for f := iter.ReadObject(); f != ""; f = iter.ReadObject() {
switch f {
case Variants:
for iter.ReadArray() {
expr, err := decodeSample(iter)
if err != nil {
return nil, err
}
e.AddVariant(expr)
}
case LogSelector:
logRange, err := decodeLogRange(iter)
if err != nil {
return nil, err
}
e.SetLogSelector(logRange)
}
}
return &e, nil
}

@ -59,6 +59,15 @@ func TestJSONSerializationRoundTrip(t *testing.T) {
"empty label filter string": {
query: `rate({app="foo"} |= "bar" | json | unwrap latency | path!="" [5m])`,
},
"multiple variants": {
query: `variants(bytes_over_time({foo="bar"}[5m]), count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
},
"multiple variants with aggregation": {
query: `variants(sum by (app) (bytes_over_time({foo="bar"}[5m])), count_over_time({foo="bar"}[5m])) of ({foo="bar"}[5m])`,
},
"multiple variants with filters": {
query: `variants(bytes_over_time({foo="bar"}[5m]), count_over_time({foo="bar"}[5m])) of ({foo="bar"} | logfmt[5m])`,
},
}
for name, test := range tests {

@ -20,9 +20,11 @@ import (
expr Expr
logExpr LogSelectorExpr
metricExpr SampleExpr
variantsExpr VariantsExpr
matcher *labels.Matcher
matchers []*labels.Matcher
metricExprs []SampleExpr
stage StageExpr
stages MultiStageExpr
filterer log.LabelFilterer
@ -48,6 +50,7 @@ import (
%type <expr> expr
%type <logExpr> logExpr
%type <metricExpr> metricExpr rangeAggregationExpr vectorAggregationExpr binOpExpr labelReplaceExpr vectorExpr
%type <variantsExpr> variantsExpr
%type <stage> pipelineStage logfmtParser labelParser jsonExpressionParser logfmtExpressionParser lineFormatExpr decolorizeExpr labelFormatExpr dropLabelsExpr keepLabelsExpr
%type <stages> pipelineExpr
%type <lineFilterExpr> lineFilter lineFilters orFilter
@ -70,6 +73,7 @@ import (
%type <labelExtractionExpressionList> labelExtractionExpressionList
%type <unwrapExpr> unwrapExpr
%type <offsetExpr> offsetExpr
%type <metricExprs> metricExprs
%token <bytes> BYTES
%token <str> IDENTIFIER STRING NUMBER FUNCTION_FLAG
@ -80,7 +84,7 @@ import (
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
FIRST_OVER_TIME LAST_OVER_TIME ABSENT_OVER_TIME VECTOR LABEL_REPLACE UNPACK OFFSET PATTERN IP ON IGNORING GROUP_LEFT GROUP_RIGHT
DECOLORIZE DROP KEEP
DECOLORIZE DROP KEEP VARIANTS OF
// Operators are listed with increasing precedence.
%left <binOp> OR
@ -98,6 +102,7 @@ root:
expr:
logExpr { $$ = $1 }
| metricExpr { $$ = $1 }
| variantsExpr { $$ = $1 }
;
logExpr:
@ -116,6 +121,10 @@ metricExpr:
| OPEN_PARENTHESIS metricExpr CLOSE_PARENTHESIS { $$ = $2 }
;
variantsExpr:
VARIANTS OPEN_PARENTHESIS metricExprs CLOSE_PARENTHESIS OF OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = newVariantsExpr($3, $7) }
;
logRangeExpr:
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil, nil ) }
| selector RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $2, nil, $3 ) }
@ -512,4 +521,9 @@ grouping:
| BY OPEN_PARENTHESIS CLOSE_PARENTHESIS { $$ = &Grouping{ Without: false , Groups: nil } }
| WITHOUT OPEN_PARENTHESIS CLOSE_PARENTHESIS { $$ = &Grouping{ Without: true , Groups: nil } }
;
metricExprs:
metricExpr { $$ = []SampleExpr{$1} }
| metricExprs COMMA metricExpr { $$ = append($1, $3) }
;
%%

File diff suppressed because it is too large Load Diff

@ -12,7 +12,14 @@ import (
// AssertExpressions function removes FastRegexMatchers from all Regexp matchers to allow simple objects comparison.
// See removeFastRegexMatcherFromExpr function for the details.
func AssertExpressions(t *testing.T, expected, actual Expr) {
require.Equal(t, removeFastRegexMatcherFromExpr(expected), removeFastRegexMatcherFromExpr(actual))
require.EqualExportedValues(
t,
removeFastRegexMatcherFromExpr(expected),
removeFastRegexMatcherFromExpr(actual),
"%s\n!=\n%s",
expected,
actual,
)
}
// AssertMatchers function removes FastRegexMatchers from all Regexp matchers to allow simple objects comparison.
@ -53,6 +60,11 @@ func removeFastRegexMatcherFromExpr(expr Expr) Expr {
cleaned = append(cleaned, removeFastRegexMatcherFromLabelFilterer(filter))
}
typed.Unwrap.PostFilters = cleaned
case *MultiVariantExpr:
typed.logRange.Left = removeFastRegexMatcherFromExpr(typed.logRange.Left).(LogSelectorExpr)
for i, variant := range typed.variants {
typed.variants[i] = removeFastRegexMatcherFromExpr(variant).(SampleExpr)
}
default:
return
}

@ -8,6 +8,7 @@ type RootVisitor interface {
SampleExprVisitor
LogSelectorExprVisitor
StageExprVisitor
VariantsExprVisitor
VisitLogRange(*LogRangeExpr)
}
@ -42,6 +43,10 @@ type StageExprVisitor interface {
VisitLogfmtParser(*LogfmtParserExpr)
}
type VariantsExprVisitor interface {
VisitVariants(*MultiVariantExpr)
}
var _ RootVisitor = &DepthFirstTraversal{}
type DepthFirstTraversal struct {
@ -65,6 +70,7 @@ type DepthFirstTraversal struct {
VisitRangeAggregationFn func(v RootVisitor, e *RangeAggregationExpr)
VisitVectorFn func(v RootVisitor, e *VectorExpr)
VisitVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr)
VisitVariantsFn func(v RootVisitor, e *MultiVariantExpr)
}
// VisitBinOp implements RootVisitor.
@ -287,3 +293,19 @@ func (v *DepthFirstTraversal) VisitVectorAggregation(e *VectorAggregationExpr) {
e.Left.Accept(v)
}
}
func (v *DepthFirstTraversal) VisitVariants(e *MultiVariantExpr) {
if e == nil {
return
}
if v.VisitVariantsFn != nil {
v.VisitVariantsFn(v, e)
} else {
e.LogRange().Accept(v)
variants := e.Variants()
for i := range variants {
variants[i].Accept(v)
}
}
}

@ -46,3 +46,27 @@ func TestDepthFirstTraversalVisitor(t *testing.T) {
expr.Accept(visitor)
require.Equal(t, expected, visited)
}
func TestDepthFirstTraversalVisitor_Variants(t *testing.T) {
visited := [][2]string{}
visitor := &DepthFirstTraversal{
VisitVariantsFn: func(_ RootVisitor, e *MultiVariantExpr) {
visited = append(visited, [2]string{fmt.Sprintf("%T", e), e.String()})
},
}
// Only expressions that have a Visit function defined are added to the list
expected := [][2]string{
{
"*syntax.MultiVariantExpr",
`variants(count_over_time({env="prod"}[1m])) of ({env="prod"}[1m])`,
},
}
query := `variants(count_over_time({env="prod"}[1m])) of ({env="prod"}[1m])`
expr, err := ParseExpr(query)
require.NoError(t, err)
expr.Accept(visitor)
require.Equal(t, expected, visited)
}

@ -24,6 +24,21 @@ func Test_Walkable(t *testing.T) {
expr: `(sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])) / sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])))`,
want: 17,
},
{
desc: "single variant query",
expr: `variants(count_over_time({job="foo"}[5m])) of ({job="foo"}[5m])`,
want: 6,
},
{
desc: "single range aggregation variant query",
expr: `variants(sum by (job) (count_over_time({job="foo"}[5m]))) of ({job="foo"}[5m])`,
want: 7,
},
{
desc: "multiple variants query",
expr: `variants(count_over_time({job="foo"}[5m]), bytes_over_time({job="foo"}[5m])) of ({job="foo"}[5m])`,
want: 9,
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {

@ -16,10 +16,15 @@ var (
ErrIntervalLimit = errors.New("[interval] value exceeds limit")
ErrBlocked = errors.New("query blocked by policy")
ErrParseMatchers = errors.New("only label matchers are supported")
ErrUnsupportedSyntaxForInstantQuery = errors.New("log queries are not supported as an instant query type, please change your query to a range query type")
ErrorLabel = "__error__"
PreserveErrorLabel = "__preserve_error__"
ErrorDetailsLabel = "__error_details__"
ErrUnsupportedSyntaxForInstantQuery = errors.New(
"log queries are not supported as an instant query type, please change your query to a range query type",
)
ErrVariantsDisabled = errors.New(
"multi variant queries are disabled for this instance",
)
ErrorLabel = "__error__"
PreserveErrorLabel = "__preserve_error__"
ErrorDetailsLabel = "__error_details__"
)
// ParseError is what is returned when we failed to parse.

@ -24,3 +24,22 @@ func TestMarshalTo(t *testing.T) {
require.JSONEq(t, buf.String(), string(data))
}
func TestMarshalTo_Variant(t *testing.T) {
plan := QueryPlan{
AST: syntax.MustParseExpr(`variants(
count_over_time({app="loki"} [1m]),
bytes_over_time({app="loki"} [1m])
) of ({app="loki"}[1m])`),
}
data := make([]byte, plan.Size())
_, err := plan.MarshalTo(data)
require.NoError(t, err)
var buf bytes.Buffer
err = syntax.EncodeJSON(plan.AST, &buf)
require.NoError(t, err)
require.JSONEq(t, buf.String(), string(data))
}

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql"
logqllog "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
base "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
@ -257,6 +258,27 @@ func NewMiddleware(
if err != nil {
return nil, nil, err
}
variantsTripperware, err := NewVariantsTripperware(
cfg,
engineOpts,
log,
limits,
schema,
codec,
iqo,
resultsCache,
cacheGenNumLoader,
retentionEnabled,
PrometheusExtractor{},
metrics,
indexStatsTripperware,
metricsNamespace,
)
if err != nil {
return nil, nil, err
}
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
var (
metricRT = metricsTripperware.Wrap(next)
@ -269,9 +291,25 @@ func NewMiddleware(
seriesVolumeRT = seriesVolumeTripperware.Wrap(next)
detectedFieldsRT = detectedFieldsTripperware.Wrap(next)
detectedLabelsRT = detectedLabelsTripperware.Wrap(next)
variantsRT = variantsTripperware.Wrap(next)
)
return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, seriesVolumeRT, detectedFieldsRT, detectedLabelsRT, limits)
return newRoundTripper(
log,
next,
limitedRT,
logFilterRT,
metricRT,
seriesRT,
labelsRT,
instantRT,
statsRT,
seriesVolumeRT,
detectedFieldsRT,
detectedLabelsRT,
variantsRT,
limits,
)
}), StopperWrapper{resultsCache, statsCache, volumeCache}, nil
}
@ -325,13 +363,17 @@ func NewDetectedLabelsCardinalityFilter(rt queryrangebase.Handler) queryrangebas
type roundTripper struct {
logger log.Logger
next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler
next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, variants base.Handler
limits Limits
}
// newRoundTripper creates a new queryrange roundtripper
func newRoundTripper(logger log.Logger, next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler, limits Limits) roundTripper {
func newRoundTripper(
logger log.Logger,
next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, variants base.Handler,
limits Limits,
) roundTripper {
return roundTripper{
logger: logger,
limited: limited,
@ -345,6 +387,7 @@ func newRoundTripper(logger log.Logger, next, limited, log, metric, series, labe
seriesVolume: seriesVolume,
detectedFields: detectedFields,
detectedLabels: detectedLabels,
variants: variants,
next: next,
}
}
@ -401,7 +444,31 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return r.limited.Do(ctx, req)
}
return r.log.Do(ctx, req)
case syntax.VariantsExpr:
if err := validateMaxEntriesLimits(ctx, op.Limit, r.limits); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
matchers := e.Matchers()
if err := validateMatchers(ctx, r.limits, matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
for _, v := range e.Variants() {
groups, err := v.MatcherGroups()
if err != nil {
level.Warn(logger).Log("msg", "unexpected matcher groups error in roundtripper", "err", err)
}
for _, g := range groups {
if err := validateMatchers(ctx, r.limits, g.Matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
}
}
return r.variants.Do(ctx, req)
default:
return r.next.Do(ctx, req)
}
@ -1231,3 +1298,34 @@ func NewDetectedFieldsTripperware(
return NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
}), nil
}
// NewVariantsTripperware creates a new frontend tripperware responsible for handling queries with multiple variants for a single
// selector.
func NewVariantsTripperware(
_ Config,
_ logql.EngineOpts,
_ log.Logger,
_ Limits,
_ config.SchemaConfig,
_ base.Merger,
_ util.IngesterQueryOptions,
_ cache.Cache,
_ base.CacheGenNumberLoader,
_ bool,
_ base.Extractor,
_ *Metrics,
_ base.Middleware,
_ string,
) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, r base.Request) (base.Response, error) {
if _, ok := r.(*LokiRequest); !ok {
return next.Do(ctx, r)
}
return nil, logqlmodel.ErrVariantsDisabled
},
)
}), nil
}

@ -1005,6 +1005,7 @@ func TestPostQueries(t *testing.T) {
handler,
handler,
handler,
handler,
fakeLimits{},
).Do(ctx, lreq)
require.NoError(t, err)

@ -100,6 +100,8 @@ func ClientHTTPStatusAndError(err error) (int, error) {
return http.StatusBadRequest, err
case errors.As(err, &userErr):
return http.StatusBadRequest, err
case errors.Is(err, logqlmodel.ErrVariantsDisabled):
return http.StatusBadRequest, err
default:
if grpcErr, ok := httpgrpc.HTTPResponseFromError(err); ok {
return int(grpcErr.Code), errors.New(string(grpcErr.Body))

Loading…
Cancel
Save