Wire up LogQL filters in the ingester.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/559/head
Tom Wilkie 6 years ago committed by Tom Wilkie
parent 5e3630815b
commit bfb34e4070
  1. 29
      pkg/ingester/instance.go
  2. 92
      pkg/logql/ast.go
  3. 99
      pkg/logql/expr.go
  4. 27
      pkg/logql/expr.y
  5. 77
      pkg/logql/parser.go
  6. 20
      pkg/logql/parser_test.go
  7. 5
      pkg/promtail/promtail_test.go
  8. 17
      pkg/querier/store.go

@ -9,7 +9,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
@ -17,6 +16,7 @@ import (
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/util"
)
@ -98,28 +98,27 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
matchers, err := promql.ParseMetricSelector(req.Query)
expr, err := logql.ParseExpr(req.Query)
if err != nil {
return err
}
iterators, err := i.lookupStreams(req, matchers)
if err != nil {
return err
}
iterator := iter.NewHeapIterator(iterators, req.Direction)
defer helpers.LogError("closing iterator", iterator.Close)
if req.Regex != "" {
var err error
iterator, err = iter.NewRegexpFilter(req.Regex, iterator)
querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
iters, err := i.lookupStreams(req, matchers)
if err != nil {
return err
return nil, err
}
return iter.NewHeapIterator(iters, req.Direction), nil
})
iter, err := expr.Eval(querier)
if err != nil {
return err
}
defer helpers.LogError("closing iterator", iter.Close)
return sendBatches(iterator, queryServer, req.Limit)
return sendBatches(iter, queryServer, req.Limit)
}
func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {

@ -0,0 +1,92 @@
package logql
import (
"fmt"
"regexp"
"strings"
"github.com/grafana/loki/pkg/iter"
"github.com/prometheus/prometheus/pkg/labels"
)
// QuerierFunc implements Querier.
type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error)
// Query implements Querier.
func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) {
return q(ms)
}
// Querier allows a LogQL expression to fetch an EntryIterator for a
// set of matchers.
type Querier interface {
Query([]*labels.Matcher) (iter.EntryIterator, error)
}
// Expr is a LogQL expression.
type Expr interface {
Eval(Querier) (iter.EntryIterator, error)
}
type matchersExpr struct {
matchers []*labels.Matcher
}
func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) {
return q.Query(e.matchers)
}
type filterExpr struct {
left Expr
ty labels.MatchType
match string
}
func (e *filterExpr) Eval(q Querier) (iter.EntryIterator, error) {
var f func(string) bool
switch e.ty {
case labels.MatchRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = re.MatchString
case labels.MatchNotRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = func(line string) bool {
return !re.MatchString(line)
}
case labels.MatchEqual:
f = func(line string) bool {
return strings.Contains(line, e.match)
}
case labels.MatchNotEqual:
f = func(line string) bool {
return !strings.Contains(line, e.match)
}
default:
return nil, fmt.Errorf("unknow matcher: %v", e.match)
}
left, err := e.left.Eval(q)
if err != nil {
return nil, err
}
return iter.NewFilter(f, left), nil
}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
panic(err)
}
return m
}

@ -13,13 +13,12 @@ import (
//line pkg/logql/expr.y:9
type exprSymType struct {
yys int
Expr Expr
Matchers []*labels.Matcher
Matcher *labels.Matcher
str string
int int64
Identifier string
yys int
Expr Expr
Matchers []*labels.Matcher
Matcher *labels.Matcher
str string
int int64
}
const IDENTIFIER = 57346
@ -62,7 +61,7 @@ const exprEofCode = 1
const exprErrCode = 2
const exprInitialStackSize = 16
//line pkg/logql/expr.y:58
//line pkg/logql/expr.y:51
//line yacctab:1
var exprExca = [...]int{
@ -73,45 +72,45 @@ var exprExca = [...]int{
const exprPrivate = 57344
const exprLast = 29
const exprLast = 26
var exprAct = [...]int{
8, 10, 17, 18, 7, 3, 6, 19, 20, 21,
22, 4, 5, 28, 23, 27, 26, 25, 16, 15,
24, 14, 13, 29, 12, 11, 9, 2, 1,
8, 10, 16, 17, 7, 3, 6, 18, 19, 20,
21, 4, 5, 26, 25, 24, 23, 15, 14, 22,
13, 12, 11, 9, 2, 1,
}
var exprPact = [...]int{
-7, -1000, -5, 20, 17, 16, 14, 13, -1000, -11,
-1000, -1, -1000, -1000, -1000, -1000, -1000, -1000, 20, 12,
11, 10, 8, 19, -1000, -1000, -1000, -1000, -1000, -1000,
-7, -1000, -5, 18, 16, 15, 13, 12, -1000, -11,
-1000, -1, -1000, -1000, -1000, -1000, -1000, 18, 11, 10,
9, 8, -1000, -1000, -1000, -1000, -1000,
}
var exprPgo = [...]int{
0, 28, 27, 26, 1, 25,
0, 25, 24, 23, 1,
}
var exprR1 = [...]int{
0, 1, 2, 2, 2, 2, 2, 2, 2, 3,
3, 4, 4, 4, 4, 5, 5,
3, 4, 4, 4, 4,
}
var exprR2 = [...]int{
0, 1, 3, 3, 3, 3, 3, 2, 2, 1,
3, 3, 3, 3, 3, 1, 3,
3, 3, 3, 3, 3,
}
var exprChk = [...]int{
-1000, -1, -2, 12, 16, 17, 11, 9, 5, -3,
-4, -5, 4, 5, 5, 5, 5, 13, 14, 8,
9, 10, 11, 15, -4, 5, 5, 5, 5, 4,
-4, 4, 5, 5, 5, 5, 13, 14, 8, 9,
10, 11, -4, 5, 5, 5, 5,
}
var exprDef = [...]int{
0, -2, 1, 0, 7, 0, 0, 0, 8, 0,
9, 0, 15, 3, 4, 5, 6, 2, 0, 0,
0, 0, 0, 0, 10, 11, 12, 13, 14, 16,
9, 0, 3, 4, 5, 6, 2, 0, 0, 0,
0, 0, 10, 11, 12, 13, 14,
}
var exprTok1 = [...]int{
@ -465,99 +464,87 @@ exprdefault:
case 1:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:30
//line pkg/logql/expr.y:28
{
exprlex.(*lexer).expr = exprDollar[1].Expr
}
case 2:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:33
//line pkg/logql/expr.y:31
{
exprVAL.Expr = &matchersExpr{matchers: exprDollar[2].Matchers}
}
case 3:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:34
//line pkg/logql/expr.y:32
{
exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchRegexp, exprDollar[3].str}
exprVAL.Expr = &filterExpr{exprDollar[1].Expr, labels.MatchRegexp, exprDollar[3].str}
}
case 4:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:35
//line pkg/logql/expr.y:33
{
exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchEqual, exprDollar[3].str}
exprVAL.Expr = &filterExpr{exprDollar[1].Expr, labels.MatchEqual, exprDollar[3].str}
}
case 5:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:36
//line pkg/logql/expr.y:34
{
exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchNotRegexp, exprDollar[3].str}
exprVAL.Expr = &filterExpr{exprDollar[1].Expr, labels.MatchNotRegexp, exprDollar[3].str}
}
case 6:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:37
//line pkg/logql/expr.y:35
{
exprVAL.Expr = &matchExpr{exprDollar[1].Expr, labels.MatchNotEqual, exprDollar[3].str}
exprVAL.Expr = &filterExpr{exprDollar[1].Expr, labels.MatchNotEqual, exprDollar[3].str}
}
case 7:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:38
//line pkg/logql/expr.y:36
{
exprlex.(*lexer).Error("unexpected end of query, expected string")
}
case 8:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:39
//line pkg/logql/expr.y:37
{
exprlex.(*lexer).Error("unexpected string, expected pipe")
}
case 9:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:43
//line pkg/logql/expr.y:41
{
exprVAL.Matchers = []*labels.Matcher{exprDollar[1].Matcher}
}
case 10:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:44
//line pkg/logql/expr.y:42
{
exprVAL.Matchers = append(exprDollar[1].Matchers, exprDollar[3].Matcher)
}
case 11:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:48
//line pkg/logql/expr.y:46
{
exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].Identifier, exprDollar[3].str)
exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].str, exprDollar[3].str)
}
case 12:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:49
//line pkg/logql/expr.y:47
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].Identifier, exprDollar[3].str)
exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].str, exprDollar[3].str)
}
case 13:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:50
//line pkg/logql/expr.y:48
{
exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].Identifier, exprDollar[3].str)
exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].str, exprDollar[3].str)
}
case 14:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:51
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].Identifier, exprDollar[3].str)
}
case 15:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:55
{
exprVAL.Identifier = exprDollar[1].str
}
case 16:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:56
//line pkg/logql/expr.y:49
{
exprVAL.Identifier = exprDollar[1].Identifier + "." + exprDollar[3].str
exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].str, exprDollar[3].str)
}
}
goto exprstack /* stack new state and value */

@ -12,7 +12,6 @@ import (
Matcher *labels.Matcher
str string
int int64
Identifier string
}
%start root
@ -20,7 +19,6 @@ import (
%type <Expr> expr
%type <Matchers> matchers
%type <Matcher> matcher
%type <Identifier> identifier
%token <str> IDENTIFIER STRING
%token <val> MATCHERS LABELS EQ NEQ RE NRE OPEN_BRACE CLOSE_BRACE COMMA DOT PIPE_MATCH PIPE_EXACT
@ -31,12 +29,12 @@ root: expr { exprlex.(*lexer).expr = $1 };
expr:
OPEN_BRACE matchers CLOSE_BRACE { $$ = &matchersExpr{ matchers: $2 } }
| expr PIPE_MATCH STRING { $$ = &matchExpr{ $1, labels.MatchRegexp, $3 } }
| expr PIPE_EXACT STRING { $$ = &matchExpr{ $1, labels.MatchEqual, $3 } }
| expr NRE STRING { $$ = &matchExpr{ $1, labels.MatchNotRegexp, $3 } }
| expr NEQ STRING { $$ = &matchExpr{ $1, labels.MatchNotEqual, $3 } }
| expr PIPE_MATCH { exprlex.(*lexer).Error("unexpected end of query, expected string") }
| expr STRING { exprlex.(*lexer).Error("unexpected string, expected pipe") }
| expr PIPE_MATCH STRING { $$ = &filterExpr{ $1, labels.MatchRegexp, $3 } }
| expr PIPE_EXACT STRING { $$ = &filterExpr{ $1, labels.MatchEqual, $3 } }
| expr NRE STRING { $$ = &filterExpr{ $1, labels.MatchNotRegexp, $3 } }
| expr NEQ STRING { $$ = &filterExpr{ $1, labels.MatchNotEqual, $3 } }
| expr PIPE_MATCH { exprlex.(*lexer).Error("unexpected end of query, expected string") }
| expr STRING { exprlex.(*lexer).Error("unexpected string, expected pipe") }
;
matchers:
@ -45,14 +43,9 @@ matchers:
;
matcher:
identifier EQ STRING { $$ = mustNewMatcher(labels.MatchEqual, $1, $3) }
| identifier NEQ STRING { $$ = mustNewMatcher(labels.MatchNotEqual, $1, $3) }
| identifier RE STRING { $$ = mustNewMatcher(labels.MatchRegexp, $1, $3) }
| identifier NRE STRING { $$ = mustNewMatcher(labels.MatchNotRegexp, $1, $3) }
;
identifier:
IDENTIFIER { $$ = $1 }
| identifier DOT IDENTIFIER { $$ = $1 + "." + $3 }
IDENTIFIER EQ STRING { $$ = mustNewMatcher(labels.MatchEqual, $1, $3) }
| IDENTIFIER NEQ STRING { $$ = mustNewMatcher(labels.MatchNotEqual, $1, $3) }
| IDENTIFIER RE STRING { $$ = mustNewMatcher(labels.MatchRegexp, $1, $3) }
| IDENTIFIER NRE STRING { $$ = mustNewMatcher(labels.MatchNotRegexp, $1, $3) }
;
%%

@ -2,13 +2,9 @@ package logql
import (
"fmt"
"regexp"
"strconv"
"strings"
"text/scanner"
"github.com/grafana/loki/pkg/iter"
"github.com/prometheus/prometheus/pkg/labels"
)
// ParseExpr parses a string and returns an Expr.
@ -89,78 +85,7 @@ func (l *lexer) Error(msg string) {
}
}
// Expr is a LogQL expression.
type Expr interface {
Eval(Querier) (iter.EntryIterator, error)
}
type Querier interface {
Query([]*labels.Matcher) (iter.EntryIterator, error)
}
type matchersExpr struct {
matchers []*labels.Matcher
}
func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) {
return q.Query(e.matchers)
}
type matchExpr struct {
left Expr
ty labels.MatchType
match string
}
func (e *matchExpr) Eval(q Querier) (iter.EntryIterator, error) {
var f func(string) bool
switch e.ty {
case labels.MatchRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = re.MatchString
case labels.MatchNotRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = func(line string) bool {
return !re.MatchString(line)
}
case labels.MatchEqual:
f = func(line string) bool {
return strings.Contains(line, e.match)
}
case labels.MatchNotEqual:
f = func(line string) bool {
return !strings.Contains(line, e.match)
}
default:
return nil, fmt.Errorf("unknow matcher: %v", e.match)
}
left, err := e.left.Eval(q)
if err != nil {
return nil, err
}
return iter.NewFilter(f, left), nil
}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
panic(err)
}
return m
}
// ParseError is what is returned when we failed to parse.
type ParseError struct {
msg string
line, col int

@ -54,10 +54,12 @@ func TestParse(t *testing.T) {
in: `{foo="bar"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
},
{
in: `{http.url=~"^/admin"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}},
},
/*
{
in: `{http.url=~"^/admin"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "http.url", "^/admin")}},
},
*/
{
in: `{ foo = "bar" }`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
@ -83,7 +85,7 @@ func TestParse(t *testing.T) {
},
{
in: `{foo="bar"} |= "baz"`,
exp: &matchExpr{
exp: &filterExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",
@ -91,10 +93,10 @@ func TestParse(t *testing.T) {
},
{
in: `{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`,
exp: &matchExpr{
left: &matchExpr{
left: &matchExpr{
left: &matchExpr{
exp: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",

@ -12,6 +12,8 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
@ -25,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/config"
"github.com/grafana/loki/pkg/promtail/scrape"
@ -372,7 +373,7 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
h.recMtx.Lock()
for _, s := range req.Streams {
labels, err := parser.Labels(s.Labels)
labels, err := promql.ParseMetric(s.Labels)
if err != nil {
h.t.Error("Failed to parse incoming labels", err)
return

@ -15,24 +15,13 @@ import (
"github.com/grafana/loki/pkg/logql"
)
type querier struct {
q *Querier
req *logproto.QueryRequest
}
type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error)
func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) {
return q(ms)
}
func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
query, err := logql.ParseExpr(req.Query)
expr, err := logql.ParseExpr(req.Query)
if err != nil {
return nil, err
}
querier := QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
@ -69,7 +58,7 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it
return iter.NewHeapIterator(iters, req.Direction), nil
})
return query.Eval(querier)
return expr.Eval(querier)
}
func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk {

Loading…
Cancel
Save