From 5e3630815b0151e1806949e7c9162a1bbdc2aa1f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 18 Apr 2019 00:31:19 +0800 Subject: [PATCH] Errors and wiring. Signed-off-by: Tom Wilkie --- pkg/iter/iterator.go | 21 ++++++ pkg/logql/expr.go | 82 +++++++++++--------- pkg/logql/expr.y | 4 +- pkg/logql/parser.go | 156 ++++++++++++++++++++++++++------------- pkg/logql/parser_test.go | 89 ++++++++++++++++------ pkg/querier/querier.go | 2 +- pkg/querier/store.go | 74 ++++++++++++------- pkg/querier/tail.go | 2 +- 8 files changed, 292 insertions(+), 138 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 94ed63422b..4d47ddd2a0 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -293,6 +293,27 @@ func (i *queryClientIterator) Close() error { return i.client.CloseSend() } +type filter struct { + EntryIterator + f func(string) bool +} + +func NewFilter(f func(string) bool, i EntryIterator) EntryIterator { + return &filter{ + f: f, + EntryIterator: i, + } +} + +func (i *filter) Next() bool { + for i.EntryIterator.Next() { + if i.f(i.Entry().Line) { + return true + } + } + return false +} + type regexpFilter struct { re *regexp.Regexp EntryIterator diff --git a/pkg/logql/expr.go b/pkg/logql/expr.go index e130b9dd70..bb610038d4 100644 --- a/pkg/logql/expr.go +++ b/pkg/logql/expr.go @@ -62,7 +62,7 @@ const exprEofCode = 1 const exprErrCode = 2 const exprInitialStackSize = 16 -//line pkg/logql/expr.y:56 +//line pkg/logql/expr.y:58 //line yacctab:1 var exprExca = [...]int{ @@ -73,45 +73,45 @@ var exprExca = [...]int{ const exprPrivate = 57344 -const exprLast = 28 +const exprLast = 29 var exprAct = [...]int{ - 7, 9, 6, 18, 19, 20, 21, 4, 5, 3, - 22, 16, 17, 27, 26, 25, 24, 15, 14, 23, - 13, 12, 28, 11, 10, 8, 2, 1, + 8, 10, 17, 18, 7, 3, 6, 19, 20, 21, + 22, 4, 5, 28, 23, 27, 26, 25, 16, 15, + 24, 14, 13, 29, 12, 11, 9, 2, 1, } var exprPact = [...]int{ - -3, -1000, -9, 19, 16, 15, 13, 12, -2, -1000, - -5, -1000, -1000, -1000, -1000, -1000, -1000, 19, 11, 10, - 9, 8, 18, -1000, -1000, -1000, -1000, -1000, -1000, + -7, -1000, -5, 20, 17, 16, 14, 13, -1000, -11, + -1000, -1, -1000, -1000, -1000, -1000, -1000, -1000, 20, 12, + 11, 10, 8, 19, -1000, -1000, -1000, -1000, -1000, -1000, } var exprPgo = [...]int{ - 0, 27, 26, 25, 1, 24, + 0, 28, 27, 26, 1, 25, } var exprR1 = [...]int{ - 0, 1, 2, 2, 2, 2, 2, 3, 3, 4, - 4, 4, 4, 5, 5, + 0, 1, 2, 2, 2, 2, 2, 2, 2, 3, + 3, 4, 4, 4, 4, 5, 5, } var exprR2 = [...]int{ - 0, 1, 3, 3, 3, 3, 3, 1, 3, 3, - 3, 3, 3, 1, 3, + 0, 1, 3, 3, 3, 3, 3, 2, 2, 1, + 3, 3, 3, 3, 3, 1, 3, } var exprChk = [...]int{ - -1000, -1, -2, 12, 16, 17, 11, 9, -3, -4, - -5, 4, 5, 5, 5, 5, 13, 14, 8, 9, - 10, 11, 15, -4, 5, 5, 5, 5, 4, + -1000, -1, -2, 12, 16, 17, 11, 9, 5, -3, + -4, -5, 4, 5, 5, 5, 5, 13, 14, 8, + 9, 10, 11, 15, -4, 5, 5, 5, 5, 4, } var exprDef = [...]int{ - 0, -2, 1, 0, 0, 0, 0, 0, 0, 7, - 0, 13, 3, 4, 5, 6, 2, 0, 0, 0, - 0, 0, 0, 8, 9, 10, 11, 12, 14, + 0, -2, 1, 0, 7, 0, 0, 0, 8, 0, + 9, 0, 15, 3, 4, 5, 6, 2, 0, 0, + 0, 0, 0, 0, 10, 11, 12, 13, 14, 16, } var exprTok1 = [...]int{ @@ -473,7 +473,7 @@ exprdefault: exprDollar = exprS[exprpt-3 : exprpt+1] //line pkg/logql/expr.y:33 { - exprVAL.Expr = &matchersExpr{exprDollar[2].Matchers} + exprVAL.Expr = &matchersExpr{matchers: exprDollar[2].Matchers} } case 3: exprDollar = exprS[exprpt-3 : exprpt+1] @@ -500,50 +500,62 @@ exprdefault: exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchNotEqual, exprDollar[3].str} } case 7: + exprDollar = exprS[exprpt-2 : exprpt+1] +//line pkg/logql/expr.y:38 + { + exprlex.(*lexer).Error("unexpected end of query, expected string") + } + case 8: + exprDollar = exprS[exprpt-2 : exprpt+1] +//line pkg/logql/expr.y:39 + { + exprlex.(*lexer).Error("unexpected string, expected pipe") + } + case 9: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:41 +//line pkg/logql/expr.y:43 { exprVAL.Matchers = []*labels.Matcher{exprDollar[1].Matcher} } - case 8: + case 10: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:42 +//line pkg/logql/expr.y:44 { exprVAL.Matchers = append(exprDollar[1].Matchers, exprDollar[3].Matcher) } - case 9: + case 11: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:46 +//line pkg/logql/expr.y:48 { exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].Identifier, exprDollar[3].str) } - case 10: + case 12: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:47 +//line pkg/logql/expr.y:49 { exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].Identifier, exprDollar[3].str) } - case 11: + case 13: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:48 +//line pkg/logql/expr.y:50 { exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].Identifier, exprDollar[3].str) } - case 12: + case 14: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:49 +//line pkg/logql/expr.y:51 { exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].Identifier, exprDollar[3].str) } - case 13: + case 15: exprDollar = exprS[exprpt-1 : exprpt+1] -//line pkg/logql/expr.y:53 +//line pkg/logql/expr.y:55 { exprVAL.Identifier = exprDollar[1].str } - case 14: + case 16: exprDollar = exprS[exprpt-3 : exprpt+1] -//line pkg/logql/expr.y:54 +//line pkg/logql/expr.y:56 { exprVAL.Identifier = exprDollar[1].Identifier + "." + exprDollar[3].str } diff --git a/pkg/logql/expr.y b/pkg/logql/expr.y index a72206bcbf..7ffa4e5336 100644 --- a/pkg/logql/expr.y +++ b/pkg/logql/expr.y @@ -30,11 +30,13 @@ import ( root: expr { exprlex.(*lexer).expr = $1 }; expr: - OPEN_BRACE matchers CLOSE_BRACE { $$ = &matchersExpr{ $2 } } + OPEN_BRACE matchers CLOSE_BRACE { $$ = &matchersExpr{ matchers: $2 } } | expr PIPE_MATCH STRING { $$ = &matchExpr{ $1, labels.MatchRegexp, $3 } } | expr PIPE_EXACT STRING { $$ = &matchExpr{ $1, labels.MatchEqual, $3 } } | expr NRE STRING { $$ = &matchExpr{ $1, labels.MatchNotRegexp, $3 } } | expr NEQ STRING { $$ = &matchExpr{ $1, labels.MatchNotEqual, $3 } } + | expr PIPE_MATCH { exprlex.(*lexer).Error("unexpected end of query, expected string") } + | expr STRING { exprlex.(*lexer).Error("unexpected string, expected pipe") } ; matchers: diff --git a/pkg/logql/parser.go b/pkg/logql/parser.go index 0a390d4574..9c75f98c59 100644 --- a/pkg/logql/parser.go +++ b/pkg/logql/parser.go @@ -2,67 +2,31 @@ package logql import ( "fmt" + "regexp" "strconv" "strings" "text/scanner" + "github.com/grafana/loki/pkg/iter" "github.com/prometheus/prometheus/pkg/labels" ) // ParseExpr parses a string and returns an Expr. func ParseExpr(input string) (Expr, error) { - l := lexer{ - Scanner: scanner.Scanner{ - Mode: scanner.SkipComments | scanner.ScanStrings | scanner.ScanInts, - }, - } + var l lexer l.Init(strings.NewReader(input)) + //l.Scanner.Mode = scanner.SkipComments | scanner.ScanStrings | scanner.ScanInts + l.Scanner.Error = func(_ *scanner.Scanner, msg string) { + l.Error(msg) + } + e := exprParse(&l) - if e != 0 { + if e != 0 || l.err != nil { return nil, l.err } return l.expr, nil } -// Expr is a LogQL expression. -type Expr interface { - Eval() - Walk(func(Expr) error) error -} - -type matchersExpr struct { - matchers []*labels.Matcher -} - -func (e *matchersExpr) Eval() {} - -func (e *matchersExpr) Walk(f func(Expr) error) error { - return f(e) -} - -type matchExpr struct { - left Expr - ty labels.MatchType - match string -} - -func (e *matchExpr) Eval() {} - -func (e *matchExpr) Walk(f func(Expr) error) error { - if err := f(e); err != nil { - return err - } - return e.left.Walk(f) -} - -func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { - m, err := labels.NewMatcher(t, n, v) - if err != nil { - panic(err) - } - return m -} - var tokens = map[string]int{ ",": COMMA, ".": DOT, @@ -78,22 +42,22 @@ var tokens = map[string]int{ type lexer struct { scanner.Scanner - err error - + err error expr Expr } func (l *lexer) Lex(lval *exprSymType) int { r := l.Scan() - var err error + switch r { case scanner.EOF: return 0 case scanner.String: + var err error lval.str, err = strconv.Unquote(l.TokenText()) if err != nil { - l.err = err + l.Error(err.Error()) return 0 } return STRING @@ -112,6 +76,96 @@ func (l *lexer) Lex(lval *exprSymType) int { return IDENTIFIER } -func (l *lexer) Error(s string) { - l.err = fmt.Errorf(s) +func (l *lexer) Error(msg string) { + // We want to return the first error (from the lexer), and ignore subsequent ones. + if l.err != nil { + return + } + + l.err = ParseError{ + msg: msg, + line: l.Line, + col: l.Column, + } +} + +// Expr is a LogQL expression. +type Expr interface { + Eval(Querier) (iter.EntryIterator, error) +} + +type Querier interface { + Query([]*labels.Matcher) (iter.EntryIterator, error) +} + +type matchersExpr struct { + matchers []*labels.Matcher +} + +func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) { + return q.Query(e.matchers) +} + +type matchExpr struct { + left Expr + ty labels.MatchType + match string +} + +func (e *matchExpr) Eval(q Querier) (iter.EntryIterator, error) { + var f func(string) bool + switch e.ty { + case labels.MatchRegexp: + re, err := regexp.Compile(e.match) + if err != nil { + return nil, err + } + f = re.MatchString + + case labels.MatchNotRegexp: + re, err := regexp.Compile(e.match) + if err != nil { + return nil, err + } + f = func(line string) bool { + return !re.MatchString(line) + } + + case labels.MatchEqual: + f = func(line string) bool { + return strings.Contains(line, e.match) + } + + case labels.MatchNotEqual: + f = func(line string) bool { + return !strings.Contains(line, e.match) + } + + default: + return nil, fmt.Errorf("unknow matcher: %v", e.match) + } + + left, err := e.left.Eval(q) + if err != nil { + return nil, err + } + + return iter.NewFilter(f, left), nil +} + +func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { + m, err := labels.NewMatcher(t, n, v) + if err != nil { + panic(err) + } + return m +} + +type ParseError struct { + msg string + line, col int +} + +func (p ParseError) Error() string { + return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg) } diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index ffba818fdf..301fdd7a28 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -46,53 +46,98 @@ func TestLex(t *testing.T) { func TestParse(t *testing.T) { for _, tc := range []struct { - input string - expected Expr + in string + exp Expr + err error }{ { - `{foo="bar"}`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + in: `{foo="bar"}`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, }, { - `{http.url=~"^/admin"}`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}}, + in: `{http.url=~"^/admin"}`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}}, }, { - `{ foo = "bar" }`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + in: `{ foo = "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, }, { - `{ foo != "bar" }`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}}, + in: `{ foo != "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}}, }, { - `{ foo =~ "bar" }`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}}, + in: `{ foo =~ "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}}, }, { - `{ foo !~ "bar" }`, - &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + in: `{ foo !~ "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, }, { - `{ foo = "bar", bar != "baz" }`, - &matchersExpr{matchers: []*labels.Matcher{ + in: `{ foo = "bar", bar != "baz" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{ mustNewMatcher(labels.MatchEqual, "foo", "bar"), mustNewMatcher(labels.MatchNotEqual, "bar", "baz"), }}, }, { - `{foo="bar"} |= "baz"`, - &matchExpr{ + in: `{foo="bar"} |= "baz"`, + exp: &matchExpr{ left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, ty: labels.MatchEqual, match: "baz", }, }, + { + in: `{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, + exp: &matchExpr{ + left: &matchExpr{ + left: &matchExpr{ + left: &matchExpr{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + ty: labels.MatchEqual, + match: "baz", + }, + ty: labels.MatchRegexp, + match: "blip", + }, + ty: labels.MatchNotEqual, + match: "flip", + }, + ty: labels.MatchNotRegexp, + match: "flap", + }, + }, + { + in: `{foo="bar}`, + err: ParseError{ + msg: "literal not terminated", + line: 1, + col: 6, + }, + }, + { + in: `{foo="bar"} |~`, + err: ParseError{ + msg: "unexpected end of query, expected string", + line: 1, + col: 15, + }, + }, + { + in: `{foo="bar"} "foo"`, + err: ParseError{ + msg: "unexpected string, expected pipe", + line: 1, + col: 13, + }, + }, } { - t.Run(tc.input, func(t *testing.T) { - matchers, err := ParseExpr(tc.input) - require.NoError(t, err) - require.Equal(t, tc.expected, matchers) + t.Run(tc.in, func(t *testing.T) { + ast, err := ParseExpr(tc.in) + require.Equal(t, tc.err, err) + require.Equal(t, tc.exp, ast) }) } } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 669000db63..99a15e2594 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -102,7 +102,7 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr return nil, err } - iterators := append(chunkStoreIterators, ingesterIterators...) + iterators := append(ingesterIterators, chunkStoreIterators) iterator := iter.NewHeapIterator(iterators, req.Direction) defer helpers.LogError("closing iterator", iterator.Close) diff --git a/pkg/querier/store.go b/pkg/querier/store.go index 8dc0b5d859..9f9fbe474f 100644 --- a/pkg/querier/store.go +++ b/pkg/querier/store.go @@ -4,8 +4,6 @@ import ( "context" "sort" - "github.com/prometheus/prometheus/promql" - "github.com/cortexproject/cortex/pkg/chunk" "github.com/opentracing/opentracing-go" "github.com/prometheus/common/model" @@ -14,42 +12,64 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) -func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) { - matchers, err := promql.ParseMetricSelector(req.Query) - if err != nil { - return nil, err - } +type querier struct { + q *Querier + req *logproto.QueryRequest +} - nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") - if err != nil { - return nil, err - } +type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error) - matchers = append(matchers, nameLabelMatcher) - from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) - chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...) +func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) { + return q(ms) +} + +func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) { + query, err := logql.ParseExpr(req.Query) if err != nil { return nil, err } - for i := range chks { - chks[i] = filterChunksByTime(from, through, chks[i]) - } + querier := QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) { + nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") + if err != nil { + return nil, err + } - chksBySeries := partitionBySeriesChunks(chks, fetchers) - // Make sure the initial chunks are loaded. This is not one chunk - // per series, but rather a chunk per non-overlapping iterator. - if err := loadFirstChunks(ctx, chksBySeries); err != nil { - return nil, err - } + matchers = append(matchers, nameLabelMatcher) + from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...) + if err != nil { + return nil, err + } + + for i := range chks { + chks[i] = filterChunksByTime(from, through, chks[i]) + } + + chksBySeries := partitionBySeriesChunks(chks, fetchers) + + // Make sure the initial chunks are loaded. This is not one chunk + // per series, but rather a chunk per non-overlapping iterator. + if err := loadFirstChunks(ctx, chksBySeries); err != nil { + return nil, err + } + + // Now that we have the first chunk for each series loaded, + // we can proceed to filter the series that don't match. + chksBySeries = filterSeriesByMatchers(chksBySeries, matchers) + + iters, err := buildIterators(ctx, req, chksBySeries) + if err != nil { + return nil, err + } - // Now that we have the first chunk for each series loaded, - // we can proceed to filter the series that don't match. - chksBySeries = filterSeriesByMatchers(chksBySeries, matchers) + return iter.NewHeapIterator(iters, req.Direction), nil + }) - return buildIterators(ctx, req, chksBySeries) + return query.Eval(querier) } func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk { diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 56f2291aac..163024e36d 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -87,6 +87,6 @@ func (t *tailIterator) query() (iter.EntryIterator, error) { return nil, err } - iterators := append(chunkStoreIterators, ingesterIterators...) + iterators := append(ingesterIterators, chunkStoreIterators) return iter.NewHeapIterator(iterators, t.queryRequest.Direction), nil }