mirror of https://github.com/grafana/loki
chore(engine): Integrate metastore into physical query planning (#17016)
This PR integrates the data object lookup using the metastore into the query planning. To achieve that, I extended the `Metastore` interface with an additional function that not only returns the data object paths, but also the stream IDs of streams that match the given label matchers. ```go type Metastore interface { ... // StreamsIDs returns object store paths and stream IDs for all matching objects for the given matchers between [start,end] StreamIDs(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, [][]int64, error) ... } ``` Signed-off-by: Christian Haudum <christian.haudum@gmail.com>pull/16566/merge
parent
58aa00a5f4
commit
ba61aa88de
@ -1,25 +1,143 @@ |
||||
package physical |
||||
|
||||
import "github.com/grafana/loki/v3/pkg/dataobj/metastore" |
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/metastore" |
||||
"github.com/grafana/loki/v3/pkg/engine/internal/types" |
||||
) |
||||
|
||||
var ( |
||||
binOpToMatchTypeMapping = map[types.BinaryOp]labels.MatchType{ |
||||
types.BinaryOpMatchStr: labels.MatchEqual, |
||||
types.BinaryOpNotMatchStr: labels.MatchNotEqual, |
||||
types.BinaryOpMatchRe: labels.MatchRegexp, |
||||
types.BinaryOpNotMatchRe: labels.MatchNotRegexp, |
||||
} |
||||
) |
||||
|
||||
// Catalog is an interface that provides methods for interacting with
|
||||
// storage metadata. In traditional database systems there are system tables
|
||||
// providing this information (e.g. pg_catalog, ...) whereas in Loki there
|
||||
// is the Metastore.
|
||||
type Catalog interface { |
||||
ResolveDataObj(Expression) ([]DataObjLocation, [][]int64) |
||||
ResolveDataObj(Expression) ([]DataObjLocation, [][]int64, error) |
||||
} |
||||
|
||||
// Context is the default implementation of [Catalog].
|
||||
type Context struct { |
||||
metastore metastore.Metastore |
||||
ctx context.Context |
||||
metastore metastore.Metastore |
||||
from, through time.Time |
||||
} |
||||
|
||||
// NewContext creates a new instance of [Context] for query planning.
|
||||
func NewContext(ctx context.Context, ms metastore.Metastore, from, through time.Time) *Context { |
||||
return &Context{ |
||||
ctx: ctx, |
||||
metastore: ms, |
||||
from: from, |
||||
through: through, |
||||
} |
||||
} |
||||
|
||||
// ResolveDataObj resolves DataObj locations and streams IDs based on a given
|
||||
// [Expression]. The expression is required to be a (tree of) [BinaryExpression]
|
||||
// with a [ColumnExpression] on the left and a [LiteralExpression] on the right.
|
||||
func (c *Context) ResolveDataObj(_ Expression) ([]DataObjLocation, [][]int64) { |
||||
panic("not implemented") |
||||
func (c *Context) ResolveDataObj(selector Expression) ([]DataObjLocation, [][]int64, error) { |
||||
matchers, err := expressionToMatchers(selector) |
||||
if err != nil { |
||||
return nil, nil, fmt.Errorf("failed to convert selector expression into matchers: %w", err) |
||||
} |
||||
|
||||
paths, streamIDs, err := c.metastore.StreamIDs(c.ctx, c.from, c.through, matchers...) |
||||
if err != nil { |
||||
return nil, nil, fmt.Errorf("failed to resolve data object locations: %w", err) |
||||
} |
||||
|
||||
locations := make([]DataObjLocation, 0, len(paths)) |
||||
for _, loc := range paths { |
||||
locations = append(locations, DataObjLocation(loc)) |
||||
} |
||||
return locations, streamIDs, err |
||||
} |
||||
|
||||
func expressionToMatchers(selector Expression) ([]*labels.Matcher, error) { |
||||
if selector == nil { |
||||
return nil, nil |
||||
} |
||||
|
||||
switch expr := selector.(type) { |
||||
case *BinaryExpr: |
||||
switch expr.Op { |
||||
case types.BinaryOpAnd: |
||||
lhs, err := expressionToMatchers(expr.Left) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
rhs, err := expressionToMatchers(expr.Right) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return append(lhs, rhs...), nil |
||||
case types.BinaryOpMatchStr, types.BinaryOpNotMatchStr, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe: |
||||
op, err := convertBinaryOp(expr.Op) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
name, err := convertColumnRef(expr.Left) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
value, err := convertLiteral(expr.Right) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
lhs, err := labels.NewMatcher(op, name, value) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return []*labels.Matcher{lhs}, nil |
||||
default: |
||||
return nil, fmt.Errorf("invalid binary expression in stream selector expression: %v", expr.Op.String()) |
||||
} |
||||
default: |
||||
return nil, fmt.Errorf("invalid expression type in stream selector expression: %T", expr) |
||||
} |
||||
} |
||||
|
||||
func convertLiteral(expr Expression) (string, error) { |
||||
l, ok := expr.(*LiteralExpr) |
||||
if !ok { |
||||
return "", fmt.Errorf("expected literal expression, got %T", expr) |
||||
} |
||||
if l.ValueType() != types.ValueTypeStr { |
||||
return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) |
||||
} |
||||
return l.Value.Str(), nil |
||||
} |
||||
|
||||
func convertColumnRef(expr Expression) (string, error) { |
||||
ref, ok := expr.(*ColumnExpr) |
||||
if !ok { |
||||
return "", fmt.Errorf("expected column expression, got %T", expr) |
||||
} |
||||
if ref.Ref.Type != types.ColumnTypeLabel { |
||||
return "", fmt.Errorf("column type is not a label, got %v", ref.Ref.Type) |
||||
} |
||||
return ref.Ref.Column, nil |
||||
} |
||||
|
||||
func convertBinaryOp(t types.BinaryOp) (labels.MatchType, error) { |
||||
ty, ok := binOpToMatchTypeMapping[t] |
||||
if !ok { |
||||
return -1, fmt.Errorf("invalid binary operator for matcher: %v", t) |
||||
} |
||||
return ty, nil |
||||
} |
||||
|
||||
var _ Catalog = (*Context)(nil) |
||||
|
@ -0,0 +1,162 @@ |
||||
package physical |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/internal/types" |
||||
) |
||||
|
||||
func TestContext_ConvertLiteral(t *testing.T) { |
||||
tests := []struct { |
||||
expr Expression |
||||
want string |
||||
wantErr bool |
||||
}{ |
||||
{ |
||||
expr: NewLiteral("foo"), |
||||
want: "foo", |
||||
}, |
||||
{ |
||||
expr: NewLiteral(false), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: NewLiteral(int64(123)), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: NewLiteral(uint64(123456789)), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: &BinaryExpr{ |
||||
Left: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
Right: NewLiteral("foo"), |
||||
Op: types.BinaryOpEq, |
||||
}, |
||||
wantErr: true, |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.expr.String(), func(t *testing.T) { |
||||
got, err := convertLiteral(tt.expr) |
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
t.Log(err) |
||||
} else { |
||||
require.NoError(t, err) |
||||
require.Equal(t, tt.want, got) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestContext_ConvertColumnRef(t *testing.T) { |
||||
tests := []struct { |
||||
expr Expression |
||||
want string |
||||
wantErr bool |
||||
}{ |
||||
{ |
||||
expr: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
want: "foo", |
||||
}, |
||||
{ |
||||
expr: newColumnExpr("foo", types.ColumnTypeAmbiguous), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: newColumnExpr("foo", types.ColumnTypeBuiltin), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: NewLiteral(false), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: &BinaryExpr{ |
||||
Left: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
Right: NewLiteral("foo"), |
||||
Op: types.BinaryOpEq, |
||||
}, |
||||
wantErr: true, |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.expr.String(), func(t *testing.T) { |
||||
got, err := convertColumnRef(tt.expr) |
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
t.Log(err) |
||||
} else { |
||||
require.NoError(t, err) |
||||
require.Equal(t, tt.want, got) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestContext_ExpressionToMatchers(t *testing.T) { |
||||
tests := []struct { |
||||
expr Expression |
||||
want []*labels.Matcher |
||||
wantErr bool |
||||
}{ |
||||
{ |
||||
expr: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: NewLiteral("foo"), |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
expr: &BinaryExpr{ |
||||
Left: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
Right: NewLiteral("bar"), |
||||
Op: types.BinaryOpMatchStr, |
||||
}, |
||||
want: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), |
||||
}, |
||||
}, |
||||
{ |
||||
expr: &BinaryExpr{ |
||||
Left: &BinaryExpr{ |
||||
Left: newColumnExpr("foo", types.ColumnTypeLabel), |
||||
Right: NewLiteral("bar"), |
||||
Op: types.BinaryOpMatchStr, |
||||
}, |
||||
Right: &BinaryExpr{ |
||||
Left: newColumnExpr("bar", types.ColumnTypeLabel), |
||||
Right: NewLiteral("baz"), |
||||
Op: types.BinaryOpNotMatchStr, |
||||
}, |
||||
Op: types.BinaryOpAnd, |
||||
}, |
||||
want: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), |
||||
labels.MustNewMatcher(labels.MatchNotEqual, "bar", "baz"), |
||||
}, |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.expr.String(), func(t *testing.T) { |
||||
got, err := expressionToMatchers(tt.expr) |
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
t.Log(err) |
||||
} else { |
||||
require.NoError(t, err) |
||||
require.ElementsMatch(t, tt.want, got) |
||||
} |
||||
}) |
||||
} |
||||
} |
Loading…
Reference in new issue