mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
431 lines
13 KiB
431 lines
13 KiB
package worker_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"testing"
|
|
"testing/synctest"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/go-kit/log"
|
|
"github.com/grafana/dskit/services"
|
|
"github.com/grafana/dskit/user"
|
|
"github.com/oklog/ulid/v2"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/util/objtest"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/worker"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql"
|
|
"github.com/grafana/loki/v3/pkg/util/arrowtest"
|
|
)
|
|
|
|
// Test runs an end-to-end test of the worker.
|
|
func Test(t *testing.T) {
|
|
builder := objtest.NewBuilder(t)
|
|
|
|
logger := log.NewNopLogger()
|
|
if testing.Verbose() {
|
|
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
|
}
|
|
|
|
net := newTestNetwork()
|
|
sched := newTestScheduler(t, logger, net)
|
|
_ = newTestWorker(t, logger, builder.Location(), net)
|
|
|
|
ctx := user.InjectOrgID(t.Context(), objtest.Tenant)
|
|
|
|
builder.Append(ctx, logproto.Stream{
|
|
Labels: `{app="loki", env="dev"}`,
|
|
Entries: []logproto.Entry{{
|
|
Timestamp: time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC),
|
|
Line: "Hello, world!",
|
|
}, {
|
|
Timestamp: time.Date(2025, time.January, 1, 0, 0, 1, 0, time.UTC),
|
|
Line: "Goodbye, world!",
|
|
}},
|
|
})
|
|
builder.Close()
|
|
|
|
params, err := logql.NewLiteralParams(
|
|
`{app="loki"}`,
|
|
time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC),
|
|
time.Date(2025, time.January, 2, 0, 0, 0, 0, time.UTC),
|
|
0,
|
|
0,
|
|
logproto.BACKWARD,
|
|
1000,
|
|
[]string{"0_of_1"},
|
|
nil,
|
|
)
|
|
require.NoError(t, err, "expected to be able to create literal LogQL params")
|
|
|
|
wf := buildWorkflow(ctx, t, logger, builder.Location(), sched, params)
|
|
pipeline, err := wf.Run(ctx)
|
|
require.NoError(t, err)
|
|
|
|
expected := arrowtest.Rows{
|
|
{
|
|
"timestamp_ns.builtin.timestamp": time.Date(2025, time.January, 1, 0, 0, 1, 0, time.UTC),
|
|
"utf8.label.app": "loki",
|
|
"utf8.label.env": "dev",
|
|
"utf8.builtin.message": "Goodbye, world!",
|
|
},
|
|
{
|
|
"timestamp_ns.builtin.timestamp": time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC),
|
|
"utf8.label.app": "loki",
|
|
"utf8.label.env": "dev",
|
|
"utf8.builtin.message": "Hello, world!",
|
|
},
|
|
}
|
|
|
|
actual, err := arrowtest.TableRows(memory.DefaultAllocator, readTable(ctx, t, pipeline))
|
|
require.NoError(t, err, "failed to get rows from table")
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestWorkerGracefulShutdown tests that the worker gracefully shuts down by
|
|
// finishing execution of a task even after its context is canceled. The test
|
|
// creates a TopK node job that accepts a Stream, waits for the worker to start
|
|
// processing, cancels the worker's context, then sends data to the stream and
|
|
// verifies the job completes.
|
|
func TestWorkerGracefulShutdown(t *testing.T) {
|
|
synctest.Test(t, func(t *testing.T) {
|
|
logger := log.NewNopLogger()
|
|
if testing.Verbose() {
|
|
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
|
}
|
|
|
|
net := newTestNetwork()
|
|
sched := newTestScheduler(t, logger, net)
|
|
|
|
// Create a cancelable context for the worker's run() method
|
|
runCtx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
_ = newTestWorkerWithContext(t, logger, objtest.Location{}, net, runCtx)
|
|
|
|
ctx := user.InjectOrgID(context.Background(), objtest.Tenant)
|
|
|
|
// Create a physical plan with a TopK node
|
|
topkNode := &physical.TopK{
|
|
NodeID: ulid.Make(),
|
|
SortBy: &physical.ColumnExpr{Ref: semconv.ColumnIdentTimestamp.ColumnRef()},
|
|
Ascending: false,
|
|
K: 10,
|
|
}
|
|
|
|
planGraph := &dag.Graph[physical.Node]{}
|
|
planGraph.Add(topkNode)
|
|
plan := physical.FromGraph(*planGraph)
|
|
|
|
// Create a stream that will feed data to the TopK node
|
|
inputStream := &workflow.Stream{
|
|
ULID: ulid.Make(),
|
|
TenantID: objtest.Tenant,
|
|
}
|
|
|
|
// Create a workflow task manually with the TopK node and stream source
|
|
task := &workflow.Task{
|
|
ULID: ulid.Make(),
|
|
TenantID: objtest.Tenant,
|
|
Fragment: plan,
|
|
Sources: map[physical.Node][]*workflow.Stream{
|
|
topkNode: {inputStream},
|
|
},
|
|
Sinks: make(map[physical.Node][]*workflow.Stream),
|
|
}
|
|
|
|
// Create a results stream for the workflow output
|
|
resultsStream := &workflow.Stream{
|
|
ULID: ulid.Make(),
|
|
TenantID: objtest.Tenant,
|
|
}
|
|
task.Sinks[topkNode] = []*workflow.Stream{resultsStream}
|
|
|
|
// Create a workflow with the task
|
|
manifest := &workflow.Manifest{
|
|
Streams: []*workflow.Stream{inputStream, resultsStream},
|
|
Tasks: []*workflow.Task{task},
|
|
TaskEventHandler: func(_ context.Context, _ *workflow.Task, _ workflow.TaskStatus) {
|
|
// Empty
|
|
},
|
|
StreamEventHandler: func(_ context.Context, _ *workflow.Stream, _ workflow.StreamState) {
|
|
// Empty
|
|
},
|
|
}
|
|
require.NoError(t, sched.RegisterManifest(ctx, manifest))
|
|
|
|
// Create a simple record writer to receive results
|
|
resultsWriter := &testRecordWriter{records: make(chan arrow.RecordBatch, 10)}
|
|
require.NoError(t, sched.Listen(ctx, resultsWriter, resultsStream))
|
|
|
|
// Start the task - this will assign it to the worker
|
|
require.NoError(t, sched.Start(ctx, task))
|
|
|
|
// Wait for the task to be assigned to the worker and start waiting for input
|
|
synctest.Wait()
|
|
|
|
// Now send data to the input stream
|
|
// The worker should process this data even though its context will be canceled
|
|
schema := arrow.NewSchema([]arrow.Field{
|
|
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
|
|
semconv.FieldFromIdent(semconv.ColumnIdentMessage, false),
|
|
}, nil)
|
|
|
|
timestampBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, arrow.FixedWidthTypes.Timestamp_ns.(*arrow.TimestampType))
|
|
messageBuilder := array.NewStringBuilder(memory.DefaultAllocator)
|
|
|
|
// Create test data
|
|
for i := 0; i < 5; i++ {
|
|
timestampBuilder.Append(arrow.Timestamp(time.Date(2025, time.January, 1, 0, 0, i, 0, time.UTC).UnixNano()))
|
|
messageBuilder.Append(fmt.Sprintf("Message %d", i))
|
|
}
|
|
|
|
timestampArr := timestampBuilder.NewArray()
|
|
messageArr := messageBuilder.NewArray()
|
|
record := array.NewRecordBatch(schema, []arrow.Array{timestampArr, messageArr}, 5)
|
|
|
|
// Send data for the stream to the worker
|
|
// We need to connect to the worker 1 on behalf of worker 2 and send a StreamDataMessage
|
|
workerConn, err := net.workerListener.DialFrom(ctx, wire.LocalWorker)
|
|
require.NoError(t, err)
|
|
defer workerConn.Close()
|
|
|
|
workerPeer := &wire.Peer{
|
|
Logger: logger,
|
|
Conn: workerConn,
|
|
Handler: func(_ context.Context, _ *wire.Peer, _ wire.Message) error {
|
|
return nil
|
|
},
|
|
}
|
|
go func() { _ = workerPeer.Serve(ctx) }()
|
|
|
|
// Cancel the worker's context to trigger graceful shutdown
|
|
// This simulates the worker receiving a shutdown signal in Worker.run().
|
|
// The earliest we can cancel is after worker1Listener is dialed to, otherwise
|
|
// the connection will not be accepted.
|
|
cancel()
|
|
|
|
// Connect to the scheduler on behalf of worker 2
|
|
schedulerConn, err := net.schedulerListener.DialFrom(ctx, testAddr("worker2"))
|
|
require.NoError(t, err)
|
|
defer schedulerConn.Close()
|
|
|
|
schedulerPeer := &wire.Peer{
|
|
Logger: logger,
|
|
Conn: schedulerConn,
|
|
Handler: func(_ context.Context, _ *wire.Peer, _ wire.Message) error {
|
|
return nil
|
|
},
|
|
}
|
|
go func() { _ = schedulerPeer.Serve(ctx) }()
|
|
|
|
// Say hello to the scheduler on behalf of worker 2
|
|
err = schedulerPeer.SendMessage(ctx, wire.WorkerHelloMessage{
|
|
Threads: 1,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
synctest.Wait()
|
|
|
|
// Send the data message
|
|
err = workerPeer.SendMessage(ctx, wire.StreamDataMessage{
|
|
StreamID: inputStream.ULID,
|
|
Data: record,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Close the stream to signal EOF by sending a StreamStatusMessage
|
|
err = schedulerPeer.SendMessage(ctx, wire.StreamStatusMessage{
|
|
StreamID: inputStream.ULID,
|
|
State: workflow.StreamStateClosed,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Wait for results - the worker should have processed the data
|
|
// even though its context was canceled
|
|
resultCtx, resultCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer resultCancel()
|
|
|
|
select {
|
|
case resultRecord := <-resultsWriter.records:
|
|
require.Greater(t, resultRecord.NumRows(), int64(0), "should have at least some rows")
|
|
case <-resultCtx.Done():
|
|
t.Fatal("did not receive result within timeout")
|
|
}
|
|
|
|
synctest.Wait()
|
|
})
|
|
}
|
|
|
|
// testRecordWriter implements workflow.RecordWriter for testing
|
|
type testRecordWriter struct {
|
|
records chan arrow.RecordBatch
|
|
}
|
|
|
|
func (w *testRecordWriter) Write(ctx context.Context, record arrow.RecordBatch) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case w.records <- record:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type testNetwork struct {
|
|
schedulerListener *wire.Local
|
|
workerListener *wire.Local
|
|
dialer wire.Dialer
|
|
}
|
|
|
|
func newTestNetwork() *testNetwork {
|
|
schedulerListener := &wire.Local{Address: wire.LocalScheduler}
|
|
workerListener := &wire.Local{Address: wire.LocalWorker}
|
|
|
|
return &testNetwork{
|
|
schedulerListener: schedulerListener,
|
|
workerListener: workerListener,
|
|
dialer: wire.NewLocalDialer(schedulerListener, workerListener),
|
|
}
|
|
}
|
|
|
|
func newTestScheduler(t *testing.T, logger log.Logger, net *testNetwork) *scheduler.Scheduler {
|
|
t.Helper()
|
|
|
|
sched, err := scheduler.New(scheduler.Config{
|
|
Logger: logger,
|
|
Listener: net.schedulerListener,
|
|
})
|
|
require.NoError(t, err)
|
|
require.NoError(t, services.StartAndAwaitRunning(t.Context(), sched.Service()))
|
|
|
|
t.Cleanup(func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
require.NoError(t, services.StopAndAwaitTerminated(ctx, sched.Service()))
|
|
})
|
|
|
|
return sched
|
|
}
|
|
|
|
func newTestWorker(t *testing.T, logger log.Logger, loc objtest.Location, net *testNetwork) *worker.Worker {
|
|
return newTestWorkerWithContext(t, logger, loc, net, t.Context())
|
|
}
|
|
|
|
//nolint:revive
|
|
func newTestWorkerWithContext(t *testing.T, logger log.Logger, loc objtest.Location, net *testNetwork, runCtx context.Context) *worker.Worker {
|
|
t.Helper()
|
|
|
|
w, err := worker.New(worker.Config{
|
|
Logger: logger,
|
|
Bucket: loc.Bucket,
|
|
BatchSize: 2048,
|
|
Metastore: metastore.NewObjectMetastore(loc.Bucket, metastore.Config{}, logger, metastore.NewObjectMetastoreMetrics(prometheus.NewRegistry())),
|
|
|
|
Dialer: net.dialer,
|
|
Listener: net.workerListener,
|
|
SchedulerAddress: wire.LocalScheduler,
|
|
|
|
// Create enough threads to guarantee all tasks can be scheduled without
|
|
// blocking.
|
|
NumThreads: 2,
|
|
})
|
|
require.NoError(t, err, "expected to create worker")
|
|
require.NoError(t, services.StartAndAwaitRunning(runCtx, w.Service()))
|
|
|
|
t.Cleanup(func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
require.NoError(t, services.StopAndAwaitTerminated(ctx, w.Service()))
|
|
})
|
|
|
|
return w
|
|
}
|
|
|
|
func buildWorkflow(ctx context.Context, t *testing.T, logger log.Logger, loc objtest.Location, sched *scheduler.Scheduler, params logql.Params) *workflow.Workflow {
|
|
logicalPlan, err := logical.BuildPlan(params)
|
|
require.NoError(t, err, "expected to create logical plan")
|
|
|
|
ms := metastore.NewObjectMetastore(
|
|
loc.Bucket,
|
|
metastore.Config{IndexStoragePrefix: loc.IndexPrefix},
|
|
logger,
|
|
metastore.NewObjectMetastoreMetrics(prometheus.NewRegistry()),
|
|
)
|
|
catalog := physical.NewMetastoreCatalog(func(selector physical.Expression, predicates []physical.Expression, start time.Time, end time.Time) ([]*metastore.DataobjSectionDescriptor, error) {
|
|
req, err := physical.CatalogRequestToMetastoreSectionsRequest(selector, predicates, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp, err := ms.Sections(ctx, req)
|
|
return resp.Sections, err
|
|
})
|
|
planner := physical.NewPlanner(physical.NewContext(params.Start(), params.End()), catalog)
|
|
plan, err := planner.Build(logicalPlan)
|
|
require.NoError(t, err, "expected to create physical plan")
|
|
|
|
plan, err = planner.Optimize(plan)
|
|
require.NoError(t, err, "expected to optimize physical plan")
|
|
|
|
if testing.Verbose() {
|
|
fmt.Fprintln(os.Stderr, physical.PrintAsTree(plan))
|
|
}
|
|
|
|
opts := workflow.Options{
|
|
MaxRunningScanTasks: 32,
|
|
MaxRunningOtherTasks: 0, // unlimited
|
|
}
|
|
wf, err := workflow.New(opts, logger, objtest.Tenant, sched, plan)
|
|
require.NoError(t, err)
|
|
|
|
if testing.Verbose() {
|
|
workflow.Fprint(os.Stderr, wf)
|
|
}
|
|
return wf
|
|
}
|
|
|
|
func readTable(ctx context.Context, t *testing.T, p executor.Pipeline) arrow.Table {
|
|
var recs []arrow.RecordBatch
|
|
|
|
for {
|
|
rec, err := p.Read(ctx)
|
|
if rec != nil {
|
|
if rec.NumRows() > 0 {
|
|
recs = append(recs, rec)
|
|
}
|
|
}
|
|
|
|
if err != nil && errors.Is(err, executor.EOF) {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
return array.NewTableFromRecords(recs[0].Schema(), recs)
|
|
}
|
|
|
|
type testAddr string
|
|
|
|
var _ net.Addr = testAddr("")
|
|
|
|
func (addr testAddr) Network() string { return "local" }
|
|
func (addr testAddr) String() string { return string(addr) }
|
|
|