diff --git a/pkg/logql/chainedevaluator.go b/pkg/logql/chainedevaluator.go deleted file mode 100644 index 346dd6992c..0000000000 --- a/pkg/logql/chainedevaluator.go +++ /dev/null @@ -1,53 +0,0 @@ -package logql - -import ( - "context" - - "github.com/grafana/loki/pkg/iter" - "github.com/pkg/errors" -) - -// ChainedEvaluator is an evaluator which chains multiple other evaluators, -// deferring to the first successful one. -type ChainedEvaluator struct { - evaluators []Evaluator -} - -// StepEvaluator attempts the embedded evaluators until one succeeds or they all error. -func (c *ChainedEvaluator) StepEvaluator( - ctx context.Context, - nextEvaluator Evaluator, - expr SampleExpr, - p Params, -) (stepper StepEvaluator, err error) { - for _, eval := range c.evaluators { - if stepper, err = eval.StepEvaluator(ctx, nextEvaluator, expr, p); err == nil { - return stepper, nil - } - } - return nil, err -} - -// Iterator attempts the embedded evaluators until one succeeds or they all error. -func (c *ChainedEvaluator) Iterator( - ctx context.Context, - expr LogSelectorExpr, - p Params, -) (iterator iter.EntryIterator, err error) { - for _, eval := range c.evaluators { - if iterator, err = eval.Iterator(ctx, expr, p); err == nil { - return iterator, nil - } - } - return nil, err -} - -// NewChainedEvaluator constructs a ChainedEvaluator from one or more Evaluators -func NewChainedEvaluator(evals ...Evaluator) (*ChainedEvaluator, error) { - if len(evals) == 0 { - return nil, errors.New("must supply an Evaluator") - } - return &ChainedEvaluator{ - evaluators: evals, - }, nil -} diff --git a/pkg/logql/downstreamer.go b/pkg/logql/downstreamer.go deleted file mode 100644 index 96fdef69dc..0000000000 --- a/pkg/logql/downstreamer.go +++ /dev/null @@ -1,7 +0,0 @@ -package logql - -// Downstreamer is an interface for deferring responsibility for query execution. -// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs. -type Downstreamer interface { - // Downstream(*LokiRequest) (*LokiResponse, error) -} diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index d8ae72ea3e..5141a160d3 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -1,13 +1,9 @@ package logql import ( - "context" "fmt" "github.com/cortexproject/cortex/pkg/querier/astmapper" - "github.com/grafana/loki/pkg/iter" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/promql" ) // DownstreamSampleExpr is a SampleExpr which signals downstream computation @@ -59,96 +55,3 @@ func (c ConcatLogSelectorExpr) String() string { return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String()) } - -// downstreamEvaluator is an evaluator which handles shard aware AST nodes -type downstreamEvaluator struct{ Downstreamer } - -// Evaluator returns a StepEvaluator for a given SampleExpr -func (ev *downstreamEvaluator) StepEvaluator( - ctx context.Context, - nextEv Evaluator, - expr SampleExpr, - params Params, -) (StepEvaluator, error) { - switch e := expr.(type) { - case DownstreamSampleExpr: - // downstream to a querier - return nil, errors.New("unimplemented") - - case ConcatSampleExpr: - // ensure they all impl the same (SampleExpr, LogSelectorExpr) & concat - var xs []StepEvaluator - cur := &e - - for cur != nil { - eval, err := ev.StepEvaluator(ctx, nextEv, cur.SampleExpr, params) - if err != nil { - // Close previously opened StepEvaluators - for _, x := range xs { - x.Close() - } - return nil, err - } - xs = append(xs, eval) - cur = cur.next - } - - return ConcatEvaluator(xs) - - default: - return nil, EvaluatorUnsupportedType(expr, ev) - } -} - -// Iterator returns the iter.EntryIterator for a given LogSelectorExpr -func (ev *downstreamEvaluator) Iterator( - ctx context.Context, - expr LogSelectorExpr, - params Params, -) (iter.EntryIterator, error) { - switch e := expr.(type) { - case DownstreamLogSelectorExpr: - // downstream to a querier - return nil, errors.New("unimplemented") - case ConcatLogSelectorExpr: - var iters []iter.EntryIterator - cur := &e - for cur != nil { - iterator, err := ev.Iterator(ctx, e, params) - if err != nil { - // Close previously opened StepEvaluators - for _, x := range iters { - x.Close() - } - return nil, err - } - iters = append(iters, iterator) - } - return iter.NewHeapIterator(ctx, iters, params.Direction()), nil - } - return nil, errors.New("unimplemented") -} - -// ConcatEvaluator joins multiple StepEvaluators. -// Contract: They must be of identical start, end, and step values. -func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { - return newStepEvaluator( - func() (done bool, ts int64, vec promql.Vector) { - var cur promql.Vector - for _, eval := range evaluators { - done, ts, cur = eval.Next() - vec = append(vec, cur...) - } - return done, ts, vec - - }, - func() (lastErr error) { - for _, eval := range evaluators { - if err := eval.Close(); err != nil { - lastErr = err - } - } - return lastErr - }, - ) -}