Errors and wiring.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/559/head
Tom Wilkie 7 years ago committed by Tom Wilkie
parent b16da1d7c3
commit 5e3630815b
  1. 21
      pkg/iter/iterator.go
  2. 82
      pkg/logql/expr.go
  3. 4
      pkg/logql/expr.y
  4. 156
      pkg/logql/parser.go
  5. 89
      pkg/logql/parser_test.go
  6. 2
      pkg/querier/querier.go
  7. 74
      pkg/querier/store.go
  8. 2
      pkg/querier/tail.go

@ -293,6 +293,27 @@ func (i *queryClientIterator) Close() error {
return i.client.CloseSend()
}
type filter struct {
EntryIterator
f func(string) bool
}
func NewFilter(f func(string) bool, i EntryIterator) EntryIterator {
return &filter{
f: f,
EntryIterator: i,
}
}
func (i *filter) Next() bool {
for i.EntryIterator.Next() {
if i.f(i.Entry().Line) {
return true
}
}
return false
}
type regexpFilter struct {
re *regexp.Regexp
EntryIterator

@ -62,7 +62,7 @@ const exprEofCode = 1
const exprErrCode = 2
const exprInitialStackSize = 16
//line pkg/logql/expr.y:56
//line pkg/logql/expr.y:58
//line yacctab:1
var exprExca = [...]int{
@ -73,45 +73,45 @@ var exprExca = [...]int{
const exprPrivate = 57344
const exprLast = 28
const exprLast = 29
var exprAct = [...]int{
7, 9, 6, 18, 19, 20, 21, 4, 5, 3,
22, 16, 17, 27, 26, 25, 24, 15, 14, 23,
13, 12, 28, 11, 10, 8, 2, 1,
8, 10, 17, 18, 7, 3, 6, 19, 20, 21,
22, 4, 5, 28, 23, 27, 26, 25, 16, 15,
24, 14, 13, 29, 12, 11, 9, 2, 1,
}
var exprPact = [...]int{
-3, -1000, -9, 19, 16, 15, 13, 12, -2, -1000,
-5, -1000, -1000, -1000, -1000, -1000, -1000, 19, 11, 10,
9, 8, 18, -1000, -1000, -1000, -1000, -1000, -1000,
-7, -1000, -5, 20, 17, 16, 14, 13, -1000, -11,
-1000, -1, -1000, -1000, -1000, -1000, -1000, -1000, 20, 12,
11, 10, 8, 19, -1000, -1000, -1000, -1000, -1000, -1000,
}
var exprPgo = [...]int{
0, 27, 26, 25, 1, 24,
0, 28, 27, 26, 1, 25,
}
var exprR1 = [...]int{
0, 1, 2, 2, 2, 2, 2, 3, 3, 4,
4, 4, 4, 5, 5,
0, 1, 2, 2, 2, 2, 2, 2, 2, 3,
3, 4, 4, 4, 4, 5, 5,
}
var exprR2 = [...]int{
0, 1, 3, 3, 3, 3, 3, 1, 3, 3,
3, 3, 3, 1, 3,
0, 1, 3, 3, 3, 3, 3, 2, 2, 1,
3, 3, 3, 3, 3, 1, 3,
}
var exprChk = [...]int{
-1000, -1, -2, 12, 16, 17, 11, 9, -3, -4,
-5, 4, 5, 5, 5, 5, 13, 14, 8, 9,
10, 11, 15, -4, 5, 5, 5, 5, 4,
-1000, -1, -2, 12, 16, 17, 11, 9, 5, -3,
-4, -5, 4, 5, 5, 5, 5, 13, 14, 8,
9, 10, 11, 15, -4, 5, 5, 5, 5, 4,
}
var exprDef = [...]int{
0, -2, 1, 0, 0, 0, 0, 0, 0, 7,
0, 13, 3, 4, 5, 6, 2, 0, 0, 0,
0, 0, 0, 8, 9, 10, 11, 12, 14,
0, -2, 1, 0, 7, 0, 0, 0, 8, 0,
9, 0, 15, 3, 4, 5, 6, 2, 0, 0,
0, 0, 0, 0, 10, 11, 12, 13, 14, 16,
}
var exprTok1 = [...]int{
@ -473,7 +473,7 @@ exprdefault:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:33
{
exprVAL.Expr = &matchersExpr{exprDollar[2].Matchers}
exprVAL.Expr = &matchersExpr{matchers: exprDollar[2].Matchers}
}
case 3:
exprDollar = exprS[exprpt-3 : exprpt+1]
@ -500,50 +500,62 @@ exprdefault:
exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchNotEqual, exprDollar[3].str}
}
case 7:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:38
{
exprlex.(*lexer).Error("unexpected end of query, expected string")
}
case 8:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:39
{
exprlex.(*lexer).Error("unexpected string, expected pipe")
}
case 9:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:41
//line pkg/logql/expr.y:43
{
exprVAL.Matchers = []*labels.Matcher{exprDollar[1].Matcher}
}
case 8:
case 10:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:42
//line pkg/logql/expr.y:44
{
exprVAL.Matchers = append(exprDollar[1].Matchers, exprDollar[3].Matcher)
}
case 9:
case 11:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:46
//line pkg/logql/expr.y:48
{
exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].Identifier, exprDollar[3].str)
}
case 10:
case 12:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:47
//line pkg/logql/expr.y:49
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].Identifier, exprDollar[3].str)
}
case 11:
case 13:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:48
//line pkg/logql/expr.y:50
{
exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].Identifier, exprDollar[3].str)
}
case 12:
case 14:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:49
//line pkg/logql/expr.y:51
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].Identifier, exprDollar[3].str)
}
case 13:
case 15:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:53
//line pkg/logql/expr.y:55
{
exprVAL.Identifier = exprDollar[1].str
}
case 14:
case 16:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:54
//line pkg/logql/expr.y:56
{
exprVAL.Identifier = exprDollar[1].Identifier + "." + exprDollar[3].str
}

@ -30,11 +30,13 @@ import (
root: expr { exprlex.(*lexer).expr = $1 };
expr:
OPEN_BRACE matchers CLOSE_BRACE { $$ = &matchersExpr{ $2 } }
OPEN_BRACE matchers CLOSE_BRACE { $$ = &matchersExpr{ matchers: $2 } }
| expr PIPE_MATCH STRING { $$ = &matchExpr{ $1, labels.MatchRegexp, $3 } }
| expr PIPE_EXACT STRING { $$ = &matchExpr{ $1, labels.MatchEqual, $3 } }
| expr NRE STRING { $$ = &matchExpr{ $1, labels.MatchNotRegexp, $3 } }
| expr NEQ STRING { $$ = &matchExpr{ $1, labels.MatchNotEqual, $3 } }
| expr PIPE_MATCH { exprlex.(*lexer).Error("unexpected end of query, expected string") }
| expr STRING { exprlex.(*lexer).Error("unexpected string, expected pipe") }
;
matchers:

@ -2,67 +2,31 @@ package logql
import (
"fmt"
"regexp"
"strconv"
"strings"
"text/scanner"
"github.com/grafana/loki/pkg/iter"
"github.com/prometheus/prometheus/pkg/labels"
)
// ParseExpr parses a string and returns an Expr.
func ParseExpr(input string) (Expr, error) {
l := lexer{
Scanner: scanner.Scanner{
Mode: scanner.SkipComments | scanner.ScanStrings | scanner.ScanInts,
},
}
var l lexer
l.Init(strings.NewReader(input))
//l.Scanner.Mode = scanner.SkipComments | scanner.ScanStrings | scanner.ScanInts
l.Scanner.Error = func(_ *scanner.Scanner, msg string) {
l.Error(msg)
}
e := exprParse(&l)
if e != 0 {
if e != 0 || l.err != nil {
return nil, l.err
}
return l.expr, nil
}
// Expr is a LogQL expression.
type Expr interface {
Eval()
Walk(func(Expr) error) error
}
type matchersExpr struct {
matchers []*labels.Matcher
}
func (e *matchersExpr) Eval() {}
func (e *matchersExpr) Walk(f func(Expr) error) error {
return f(e)
}
type matchExpr struct {
left Expr
ty labels.MatchType
match string
}
func (e *matchExpr) Eval() {}
func (e *matchExpr) Walk(f func(Expr) error) error {
if err := f(e); err != nil {
return err
}
return e.left.Walk(f)
}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
panic(err)
}
return m
}
var tokens = map[string]int{
",": COMMA,
".": DOT,
@ -78,22 +42,22 @@ var tokens = map[string]int{
type lexer struct {
scanner.Scanner
err error
err error
expr Expr
}
func (l *lexer) Lex(lval *exprSymType) int {
r := l.Scan()
var err error
switch r {
case scanner.EOF:
return 0
case scanner.String:
var err error
lval.str, err = strconv.Unquote(l.TokenText())
if err != nil {
l.err = err
l.Error(err.Error())
return 0
}
return STRING
@ -112,6 +76,96 @@ func (l *lexer) Lex(lval *exprSymType) int {
return IDENTIFIER
}
func (l *lexer) Error(s string) {
l.err = fmt.Errorf(s)
func (l *lexer) Error(msg string) {
// We want to return the first error (from the lexer), and ignore subsequent ones.
if l.err != nil {
return
}
l.err = ParseError{
msg: msg,
line: l.Line,
col: l.Column,
}
}
// Expr is a LogQL expression.
type Expr interface {
Eval(Querier) (iter.EntryIterator, error)
}
type Querier interface {
Query([]*labels.Matcher) (iter.EntryIterator, error)
}
type matchersExpr struct {
matchers []*labels.Matcher
}
func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) {
return q.Query(e.matchers)
}
type matchExpr struct {
left Expr
ty labels.MatchType
match string
}
func (e *matchExpr) Eval(q Querier) (iter.EntryIterator, error) {
var f func(string) bool
switch e.ty {
case labels.MatchRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = re.MatchString
case labels.MatchNotRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = func(line string) bool {
return !re.MatchString(line)
}
case labels.MatchEqual:
f = func(line string) bool {
return strings.Contains(line, e.match)
}
case labels.MatchNotEqual:
f = func(line string) bool {
return !strings.Contains(line, e.match)
}
default:
return nil, fmt.Errorf("unknow matcher: %v", e.match)
}
left, err := e.left.Eval(q)
if err != nil {
return nil, err
}
return iter.NewFilter(f, left), nil
}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
panic(err)
}
return m
}
type ParseError struct {
msg string
line, col int
}
func (p ParseError) Error() string {
return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg)
}

@ -46,53 +46,98 @@ func TestLex(t *testing.T) {
func TestParse(t *testing.T) {
for _, tc := range []struct {
input string
expected Expr
in string
exp Expr
err error
}{
{
`{foo="bar"}`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
in: `{foo="bar"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
},
{
`{http.url=~"^/admin"}`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}},
in: `{http.url=~"^/admin"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}},
},
{
`{ foo = "bar" }`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
in: `{ foo = "bar" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
},
{
`{ foo != "bar" }`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}},
in: `{ foo != "bar" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}},
},
{
`{ foo =~ "bar" }`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}},
in: `{ foo =~ "bar" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}},
},
{
`{ foo !~ "bar" }`,
&matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}},
in: `{ foo !~ "bar" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}},
},
{
`{ foo = "bar", bar != "baz" }`,
&matchersExpr{matchers: []*labels.Matcher{
in: `{ foo = "bar", bar != "baz" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
mustNewMatcher(labels.MatchNotEqual, "bar", "baz"),
}},
},
{
`{foo="bar"} |= "baz"`,
&matchExpr{
in: `{foo="bar"} |= "baz"`,
exp: &matchExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",
},
},
{
in: `{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`,
exp: &matchExpr{
left: &matchExpr{
left: &matchExpr{
left: &matchExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",
},
ty: labels.MatchRegexp,
match: "blip",
},
ty: labels.MatchNotEqual,
match: "flip",
},
ty: labels.MatchNotRegexp,
match: "flap",
},
},
{
in: `{foo="bar}`,
err: ParseError{
msg: "literal not terminated",
line: 1,
col: 6,
},
},
{
in: `{foo="bar"} |~`,
err: ParseError{
msg: "unexpected end of query, expected string",
line: 1,
col: 15,
},
},
{
in: `{foo="bar"} "foo"`,
err: ParseError{
msg: "unexpected string, expected pipe",
line: 1,
col: 13,
},
},
} {
t.Run(tc.input, func(t *testing.T) {
matchers, err := ParseExpr(tc.input)
require.NoError(t, err)
require.Equal(t, tc.expected, matchers)
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)
require.Equal(t, tc.err, err)
require.Equal(t, tc.exp, ast)
})
}
}

@ -102,7 +102,7 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr
return nil, err
}
iterators := append(chunkStoreIterators, ingesterIterators...)
iterators := append(ingesterIterators, chunkStoreIterators)
iterator := iter.NewHeapIterator(iterators, req.Direction)
defer helpers.LogError("closing iterator", iterator.Close)

@ -4,8 +4,6 @@ import (
"context"
"sort"
"github.com/prometheus/prometheus/promql"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
@ -14,42 +12,64 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)
func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) {
matchers, err := promql.ParseMetricSelector(req.Query)
if err != nil {
return nil, err
}
type querier struct {
q *Querier
req *logproto.QueryRequest
}
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
}
type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error)
matchers = append(matchers, nameLabelMatcher)
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...)
func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) {
return q(ms)
}
func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
query, err := logql.ParseExpr(req.Query)
if err != nil {
return nil, err
}
for i := range chks {
chks[i] = filterChunksByTime(from, through, chks[i])
}
querier := QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
}
chksBySeries := partitionBySeriesChunks(chks, fetchers)
// Make sure the initial chunks are loaded. This is not one chunk
// per series, but rather a chunk per non-overlapping iterator.
if err := loadFirstChunks(ctx, chksBySeries); err != nil {
return nil, err
}
matchers = append(matchers, nameLabelMatcher)
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...)
if err != nil {
return nil, err
}
for i := range chks {
chks[i] = filterChunksByTime(from, through, chks[i])
}
chksBySeries := partitionBySeriesChunks(chks, fetchers)
// Make sure the initial chunks are loaded. This is not one chunk
// per series, but rather a chunk per non-overlapping iterator.
if err := loadFirstChunks(ctx, chksBySeries); err != nil {
return nil, err
}
// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
iters, err := buildIterators(ctx, req, chksBySeries)
if err != nil {
return nil, err
}
// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
return iter.NewHeapIterator(iters, req.Direction), nil
})
return buildIterators(ctx, req, chksBySeries)
return query.Eval(querier)
}
func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk {

@ -87,6 +87,6 @@ func (t *tailIterator) query() (iter.EntryIterator, error) {
return nil, err
}
iterators := append(chunkStoreIterators, ingesterIterators...)
iterators := append(ingesterIterators, chunkStoreIterators)
return iter.NewHeapIterator(iterators, t.queryRequest.Direction), nil
}

Loading…
Cancel
Save