package executor import ( "testing" "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" ) func TestExecutor(t *testing.T) { t.Run("pipeline fails if plan is nil", func(t *testing.T) { ctx := t.Context() pipeline := Run(ctx, Config{}, nil, log.NewNopLogger()) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "failed to execute pipeline: plan is nil") }) t.Run("pipeline fails if plan has no root node", func(t *testing.T) { ctx := t.Context() pipeline := Run(ctx, Config{}, &physical.Plan{}, log.NewNopLogger()) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "failed to execute pipeline: plan has no root node") }) } func TestExecutor_Limit(t *testing.T) { t.Run("no inputs result in empty pipeline", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeLimit(ctx, &physical.Limit{}, nil, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, EOF.Error()) }) t.Run("multiple inputs result in error", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeLimit(ctx, &physical.Limit{}, []Pipeline{emptyPipeline(), emptyPipeline()}, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "limit expects exactly one input, got 2") }) } func TestExecutor_Filter(t *testing.T) { t.Run("no inputs result in empty pipeline", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeFilter(ctx, &physical.Filter{}, nil, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, EOF.Error()) }) t.Run("multiple inputs result in error", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeFilter(ctx, &physical.Filter{}, []Pipeline{emptyPipeline(), emptyPipeline()}, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "filter expects exactly one input, got 2") }) } func TestExecutor_Projection(t *testing.T) { t.Run("no inputs result in empty pipeline", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeProjection(ctx, &physical.Projection{}, nil, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, EOF.Error()) }) t.Run("missing column expression results in error", func(t *testing.T) { ctx := t.Context() cols := []physical.Expression{} c := &Context{} pipeline := c.executeProjection(ctx, &physical.Projection{Expressions: cols}, []Pipeline{emptyPipeline()}, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "projection expects at least one expression, got 0") }) t.Run("multiple inputs result in error", func(t *testing.T) { ctx := t.Context() c := &Context{} pipeline := c.executeProjection(ctx, &physical.Projection{}, []Pipeline{emptyPipeline(), emptyPipeline()}, nil) _, err := pipeline.Read(ctx) require.ErrorContains(t, err, "projection expects exactly one input, got 2") }) }