removes sharding evaluator code

k14
Owen Diehl 6 years ago
parent 101ef57ea2
commit 55d41b9519
No known key found for this signature in database
GPG Key ID: C75D694E8203E7B9
  1. 53
      pkg/logql/chainedevaluator.go
  2. 7
      pkg/logql/downstreamer.go
  3. 97
      pkg/logql/sharding.go

@ -1,53 +0,0 @@
package logql
import (
"context"
"github.com/grafana/loki/pkg/iter"
"github.com/pkg/errors"
)
// ChainedEvaluator is an evaluator which chains multiple other evaluators,
// deferring to the first successful one.
type ChainedEvaluator struct {
evaluators []Evaluator
}
// StepEvaluator attempts the embedded evaluators until one succeeds or they all error.
func (c *ChainedEvaluator) StepEvaluator(
ctx context.Context,
nextEvaluator Evaluator,
expr SampleExpr,
p Params,
) (stepper StepEvaluator, err error) {
for _, eval := range c.evaluators {
if stepper, err = eval.StepEvaluator(ctx, nextEvaluator, expr, p); err == nil {
return stepper, nil
}
}
return nil, err
}
// Iterator attempts the embedded evaluators until one succeeds or they all error.
func (c *ChainedEvaluator) Iterator(
ctx context.Context,
expr LogSelectorExpr,
p Params,
) (iterator iter.EntryIterator, err error) {
for _, eval := range c.evaluators {
if iterator, err = eval.Iterator(ctx, expr, p); err == nil {
return iterator, nil
}
}
return nil, err
}
// NewChainedEvaluator constructs a ChainedEvaluator from one or more Evaluators
func NewChainedEvaluator(evals ...Evaluator) (*ChainedEvaluator, error) {
if len(evals) == 0 {
return nil, errors.New("must supply an Evaluator")
}
return &ChainedEvaluator{
evaluators: evals,
}, nil
}

@ -1,7 +0,0 @@
package logql
// Downstreamer is an interface for deferring responsibility for query execution.
// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs.
type Downstreamer interface {
// Downstream(*LokiRequest) (*LokiResponse, error)
}

@ -1,13 +1,9 @@
package logql
import (
"context"
"fmt"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/iter"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql"
)
// DownstreamSampleExpr is a SampleExpr which signals downstream computation
@ -59,96 +55,3 @@ func (c ConcatLogSelectorExpr) String() string {
return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String())
}
// downstreamEvaluator is an evaluator which handles shard aware AST nodes
type downstreamEvaluator struct{ Downstreamer }
// Evaluator returns a StepEvaluator for a given SampleExpr
func (ev *downstreamEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
expr SampleExpr,
params Params,
) (StepEvaluator, error) {
switch e := expr.(type) {
case DownstreamSampleExpr:
// downstream to a querier
return nil, errors.New("unimplemented")
case ConcatSampleExpr:
// ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat
var xs []StepEvaluator
cur := &e
for cur != nil {
eval, err := ev.StepEvaluator(ctx, nextEv, cur.SampleExpr, params)
if err != nil {
// Close previously opened StepEvaluators
for _, x := range xs {
x.Close()
}
return nil, err
}
xs = append(xs, eval)
cur = cur.next
}
return ConcatEvaluator(xs)
default:
return nil, EvaluatorUnsupportedType(expr, ev)
}
}
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
func (ev *downstreamEvaluator) Iterator(
ctx context.Context,
expr LogSelectorExpr,
params Params,
) (iter.EntryIterator, error) {
switch e := expr.(type) {
case DownstreamLogSelectorExpr:
// downstream to a querier
return nil, errors.New("unimplemented")
case ConcatLogSelectorExpr:
var iters []iter.EntryIterator
cur := &e
for cur != nil {
iterator, err := ev.Iterator(ctx, e, params)
if err != nil {
// Close previously opened StepEvaluators
for _, x := range iters {
x.Close()
}
return nil, err
}
iters = append(iters, iterator)
}
return iter.NewHeapIterator(ctx, iters, params.Direction()), nil
}
return nil, errors.New("unimplemented")
}
// ConcatEvaluator joins multiple StepEvaluators.
// Contract: They must be of identical start, end, and step values.
func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) {
return newStepEvaluator(
func() (done bool, ts int64, vec promql.Vector) {
var cur promql.Vector
for _, eval := range evaluators {
done, ts, cur = eval.Next()
vec = append(vec, cur...)
}
return done, ts, vec
},
func() (lastErr error) {
for _, eval := range evaluators {
if err := eval.Close(); err != nil {
lastErr = err
}
}
return lastErr
},
)
}

Loading…
Cancel
Save