diff --git a/pkg/blockbuilder/architecture.md b/pkg/blockbuilder/architecture.md index 633d7340cc..c9abff4056 100644 --- a/pkg/blockbuilder/architecture.md +++ b/pkg/blockbuilder/architecture.md @@ -10,22 +10,27 @@ The Block Builder system is organized into three main packages: ### pkg/blockbuilder/types - Contains shared type definitions and interfaces -- Defines core data structures like `Job` and `Offsets` +- Defines core data structures like `Job` and `JobStatus` - Provides interface definitions for: - - `Worker`: Interface for processing jobs and reporting status - - `Scheduler`: Interface for job scheduling and worker management - - `Transport`: Interface for communication between components + - `BuilderTransport`: Interface for builder-to-scheduler communication + - `SchedulerHandler`: Interface for scheduler business logic + +The transport layer is split into client and server components: +- Client side uses `BuilderTransport` to abstract gRPC details +- Server side uses `SchedulerHandler` for pure business logic +- A gRPC adapter connects the two sides ### pkg/blockbuilder/scheduler - Implements the job queue and scheduling logic - Manages job distribution to block builders - Tracks job progress and ensures exactly-once processing -- Handles job state management and offset tracking +- Implements `SchedulerHandler` interface for business logic +- Uses gRPC adapter to expose services to builders ### pkg/blockbuilder/builder -- Implements the block builder worker functionality +- Implements the block builder functionality +- Uses `BuilderTransport` to communicate with scheduler - Processes assigned jobs and builds storage formats -- Manages transport layer communication - Handles data processing and object storage interactions ## Component Diagram @@ -42,7 +47,8 @@ graph TB PC[Partition Controller] subgraph Transport Layer - T[gRPC/Transport Interface] + GA[gRPC Adapter] + SH[Scheduler Handler] end end @@ -50,6 +56,10 @@ graph TB BB1[Block Builder 1] BB2[Block Builder 2] BB3[Block Builder N] + + subgraph Builder Transport + BT[Builder Transport] + end end subgraph Storage @@ -59,10 +69,12 @@ graph TB KP --> PC PC --> S S <--> Q - S <--> T - T <--> BB1 - T <--> BB2 - T <--> BB3 + S --> SH + SH <--> GA + GA <--> BT + BT <--> BB1 + BT <--> BB2 + BT <--> BB3 BB1 --> OS BB2 --> OS BB3 --> OS @@ -73,9 +85,10 @@ graph TB ```mermaid sequenceDiagram participant PC as Partition Controller - participant S as Block Scheduler - participant Q as Queue - participant T as Transport + participant S as Scheduler + participant SH as SchedulerHandler + participant GA as gRPC Adapter + participant BT as Builder Transport participant BB as Block Builder participant OS as Object Storage @@ -85,57 +98,53 @@ sequenceDiagram S->>Q: Enqueue Job end - BB->>T: Request Job - T->>S: Forward Request - S->>Q: Dequeue Job - Q-->>S: Return Job (or empty) + BB->>BT: Request Job + BT->>GA: gRPC GetJob Request + GA->>SH: HandleGetJob + SH->>S: Get Job from Queue + S-->>SH: Return Job (or empty) + alt Has Job - S->>T: Send Job - T->>BB: Forward Job + SH-->>GA: Return Job + GA-->>BT: gRPC Response + BT-->>BB: Return Job BB->>OS: Process & Write Data - BB->>T: Report Success - T->>S: Forward Status + BB->>BT: Report Success + BT->>GA: gRPC CompleteJob + GA->>SH: HandleCompleteJob + SH->>S: Mark Complete S->>PC: Commit Offset else No Job - S->>T: Send No Job Available - T->>BB: Forward Response + SH-->>GA: Return No Job + GA-->>BT: gRPC Response + BT-->>BB: Return No Job end ``` -## Core Components - -### Job and Offsets -- `Job`: Represents a unit of work for processing Kafka data - - Contains a partition ID and an offset range - - Immutable data structure that can be safely passed between components -- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process - - Used to track progress and ensure exactly-once processing - -### Block Scheduler -- Central component responsible for: - - Managing the job queue - - Coordinating Block Builder assignments - - Tracking job progress -- Implements a pull-based model where Block Builders request jobs -- Decoupled from specific transport mechanisms through the Transport interface - -### Block Builder -- Processes jobs assigned by the Block Scheduler -- Responsible for: - - Building storage formats from Kafka data - - Writing completed blocks to object storage - - Reporting job status back to scheduler -- Implements the Worker interface for job processing - -### Transport Layer -- Provides communication between Block Builders and Scheduler -- Abstracts transport mechanism (currently in-memory & gRPC) -- Defines message types for: - - Job requests - - Job completion notifications - - Job synchronization - -## Design Principles +## Interface Design + +The system uses a layered interface approach: + +1. **Builder Side**: + - Simple API for job processing + - `BuilderTransport`: Handles communication details + - Builders work with domain types, unaware of gRPC + +2. **Transport Layer**: + - gRPC service definitions in proto files + - Adapter pattern to convert between proto and domain types + - Clear separation between transport and business logic + +3. **Scheduler Side**: + - `SchedulerHandler`: Pure business logic interface + - No knowledge of transport details + - Clean separation of concerns + +This design allows for: +- Easy testing of each layer independently +- Flexibility to change transport mechanism +- Clear separation between business logic and communication +- Type-safe conversions between proto and domain types ### Decoupled I/O - Business logic is separated from I/O operations @@ -150,10 +159,4 @@ sequenceDiagram ### Pull-Based Architecture - Block Builders pull jobs when ready - Natural load balancing -- Prevents overloading of workers - - -### Interface-Driven Development -- Core components defined by interfaces -- Allows for multiple implementations -- Facilitates testing and modularity +- Prevents overloading of workers \ No newline at end of file diff --git a/pkg/blockbuilder/builder/worker.go b/pkg/blockbuilder/builder/worker.go deleted file mode 100644 index 4b5f36d649..0000000000 --- a/pkg/blockbuilder/builder/worker.go +++ /dev/null @@ -1,67 +0,0 @@ -package builder - -import ( - "context" - - "github.com/grafana/loki/v3/pkg/blockbuilder/types" -) - -var ( - _ types.Worker = unimplementedWorker{} - _ types.Worker = &Worker{} -) - -// unimplementedWorker provides default implementations for the Worker interface. -type unimplementedWorker struct{} - -func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) { - panic("unimplemented") -} - -func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job, _ bool) error { - panic("unimplemented") -} - -func (u unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error { - panic("unimplemented") -} - -// Worker is the implementation of the Worker interface. -type Worker struct { - unimplementedWorker - transport types.Transport - builderID string -} - -// NewWorker creates a new Worker instance. -func NewWorker(builderID string, transport types.Transport) *Worker { - return &Worker{ - transport: transport, - builderID: builderID, - } -} - -func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) { - resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{ - BuilderID: w.builderID, - }) - if err != nil { - return nil, false, err - } - return resp.Job, resp.OK, nil -} - -func (w *Worker) CompleteJob(ctx context.Context, job *types.Job, success bool) error { - return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{ - BuilderID: w.builderID, - Job: job, - Success: success, - }) -} - -func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error { - return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{ - BuilderID: w.builderID, - Job: job, - }) -} diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index f1575eb456..4eb1eaedde 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -16,12 +16,11 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" - "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" "github.com/grafana/loki/v3/pkg/kafka/partition" ) var ( - _ types.Scheduler = &BlockScheduler{} + _ types.SchedulerHandler = &BlockScheduler{} ) type Config struct { @@ -175,7 +174,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t } } -func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job) error { +func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error { // TODO: handle commits s.queue.MarkComplete(job.ID) return nil @@ -185,43 +184,3 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job s.queue.SyncJob(job.ID, builderID, job) return nil } - -func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) { - s.queue.MarkComplete(req.Job.Id) - return &proto.CompleteJobResponse{}, nil -} - -func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) { - s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{ - ID: req.Job.Id, - Partition: req.Job.Partition, - Offsets: types.Offsets{ - Min: req.Job.Offsets.Min, - Max: req.Job.Offsets.Max, - }, - }) - - return &proto.SyncJobResponse{}, nil -} - -func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { - var resp proto.GetJobResponse - job, ok, err := s.queue.Dequeue(req.BuilderId) - if err != nil { - return &resp, err - } - - if ok { - resp.Ok = true - resp.Job = &proto.Job{ - Id: job.ID, - Partition: job.Partition, - Offsets: &proto.Offsets{ - Min: job.Offsets.Min, - Max: job.Offsets.Max, - }, - } - } - - return &resp, nil -} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 201b56eeb9..bc72d985f3 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/v3/pkg/blockbuilder/builder" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) @@ -16,14 +15,14 @@ type testEnv struct { queue *JobQueue scheduler *BlockScheduler transport *types.MemoryTransport - builder *builder.Worker + builder *Worker } func newTestEnv(builderID string) *testEnv { queue := NewJobQueue() scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry()) transport := types.NewMemoryTransport(scheduler) - builder := builder.NewWorker(builderID, transport) + builder := NewWorker(builderID, transport) return &testEnv{ queue: queue, @@ -89,7 +88,7 @@ func TestMultipleBuilders(t *testing.T) { // Create first environment env1 := newTestEnv("test-builder-1") // Create second builder using same scheduler - builder2 := builder.NewWorker("test-builder-2", env1.transport) + builder2 := NewWorker("test-builder-2", env1.transport) ctx := context.Background() @@ -250,3 +249,47 @@ func TestConfig_Validate(t *testing.T) { }) } } + +// Worker handles communication with the scheduler service. +type Worker struct { + transport types.BuilderTransport + builderID string +} + +// NewWorker creates a new Worker instance. +func NewWorker(builderID string, transport types.BuilderTransport) *Worker { + return &Worker{ + transport: transport, + builderID: builderID, + } +} + +// GetJob requests a new job from the scheduler. +func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) { + resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{ + BuilderID: w.builderID, + }) + if err != nil { + return nil, false, err + } + return resp.Job, resp.OK, nil +} + +// CompleteJob marks a job as finished. +func (w *Worker) CompleteJob(ctx context.Context, job *types.Job, success bool) error { + err := w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{ + BuilderID: w.builderID, + Job: job, + Success: success, + }) + return err +} + +// SyncJob informs the scheduler about an in-progress job. +func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error { + err := w.transport.SendSyncJob(ctx, &types.SyncJobRequest{ + BuilderID: w.builderID, + Job: job, + }) + return err +} diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go index a53f06282e..3b90ba9f20 100644 --- a/pkg/blockbuilder/types/grpc_transport.go +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" ) -var _ Transport = &GRPCTransport{} +var _ BuilderTransport = &GRPCTransport{} type grpcTransportMetrics struct { requestLatency *prometheus.HistogramVec @@ -38,7 +38,7 @@ func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMet type GRPCTransport struct { grpc_health_v1.HealthClient io.Closer - proto.BlockBuilderServiceClient + proto.SchedulerServiceClient } // NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options @@ -58,9 +58,9 @@ func NewGRPCTransportFromAddress( } return &GRPCTransport{ - Closer: conn, - HealthClient: grpc_health_v1.NewHealthClient(conn), - BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn), + Closer: conn, + HealthClient: grpc_health_v1.NewHealthClient(conn), + SchedulerServiceClient: proto.NewSchedulerServiceClient(conn), }, nil } diff --git a/pkg/blockbuilder/types/interfaces.go b/pkg/blockbuilder/types/interfaces.go index f5746dea5f..2144e83878 100644 --- a/pkg/blockbuilder/types/interfaces.go +++ b/pkg/blockbuilder/types/interfaces.go @@ -2,35 +2,6 @@ package types import "context" -// Worker interface defines the methods for processing jobs and reporting status. -type Worker interface { - // GetJob requests a new job from the scheduler - GetJob(ctx context.Context) (*Job, bool, error) - // CompleteJob marks a job as finished - CompleteJob(ctx context.Context, job *Job, success bool) error - // SyncJob informs the scheduler about an in-progress job - SyncJob(ctx context.Context, job *Job) error -} - -// Scheduler interface defines the methods for scheduling jobs and managing worker pools. -type Scheduler interface { - // HandleGetJob processes a job request from a block builder - HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) - // HandleCompleteJob processes a job completion notification - HandleCompleteJob(ctx context.Context, builderID string, job *Job) error - // HandleSyncJob processes a job sync request - HandleSyncJob(ctx context.Context, builderID string, job *Job) error -} - -// Transport defines the interface for communication between block builders and scheduler -type Transport interface { - BuilderTransport - SchedulerTransport -} - -// SchedulerTransport is for calls originating from the scheduler -type SchedulerTransport interface{} - // BuilderTransport is for calls originating from the builder type BuilderTransport interface { // SendGetJobRequest sends a request to get a new job @@ -41,6 +12,16 @@ type BuilderTransport interface { SendSyncJob(ctx context.Context, req *SyncJobRequest) error } +// SchedulerHandler defines the business logic for handling builder requests +type SchedulerHandler interface { + // HandleGetJob processes a request for a new job + HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) + // HandleCompleteJob processes a job completion notification + HandleCompleteJob(ctx context.Context, builderID string, job *Job, success bool) error + // HandleSyncJob processes a job sync request + HandleSyncJob(ctx context.Context, builderID string, job *Job) error +} + // Request/Response message types type GetJobRequest struct { BuilderID string diff --git a/pkg/blockbuilder/types/proto/blockbuilder.pb.go b/pkg/blockbuilder/types/proto/blockbuilder.pb.go index 071812c2fa..331b210a34 100644 --- a/pkg/blockbuilder/types/proto/blockbuilder.pb.go +++ b/pkg/blockbuilder/types/proto/blockbuilder.pb.go @@ -436,36 +436,36 @@ func init() { } var fileDescriptor_04968622516f7b79 = []byte{ - // 453 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0xc1, 0x8e, 0x93, 0x40, - 0x18, 0xc7, 0x19, 0x88, 0x5b, 0xfb, 0x35, 0x56, 0x9d, 0x8d, 0xb1, 0xa9, 0x3a, 0x59, 0xc7, 0x44, - 0xd7, 0x83, 0x90, 0x54, 0x7d, 0x81, 0x7a, 0x30, 0xae, 0x07, 0x23, 0xeb, 0x69, 0x2f, 0x0a, 0x74, - 0x5a, 0xa7, 0x50, 0x06, 0x99, 0xc1, 0x74, 0x6f, 0x3e, 0x82, 0x8f, 0xe0, 0xd1, 0x47, 0xf1, 0xd8, - 0xe3, 0x1e, 0x2d, 0xbd, 0x78, 0xdc, 0x47, 0x30, 0x0c, 0x50, 0x25, 0x4b, 0xea, 0x5e, 0x3c, 0x01, - 0x7f, 0x7e, 0x7c, 0xff, 0x3f, 0xdf, 0xf7, 0x0d, 0x38, 0x49, 0x38, 0x73, 0xfc, 0x48, 0x04, 0xa1, - 0x9f, 0xf1, 0x68, 0xc2, 0x52, 0x47, 0x9d, 0x26, 0x4c, 0x3a, 0x49, 0x2a, 0x94, 0x68, 0xbc, 0xb0, - 0xb5, 0x84, 0x71, 0x43, 0xd3, 0x30, 0xb5, 0xe1, 0xda, 0x4b, 0xa6, 0x8e, 0x84, 0xef, 0xb2, 0x4f, - 0x19, 0x93, 0x0a, 0xdf, 0x03, 0xa8, 0x88, 0xf7, 0x7c, 0x32, 0x40, 0x07, 0xe8, 0xb0, 0xeb, 0x76, - 0x2b, 0xe5, 0xd5, 0x84, 0xbe, 0x86, 0x7e, 0xcd, 0xcb, 0x44, 0xc4, 0x92, 0xe1, 0xc7, 0x60, 0xcd, - 0x85, 0xaf, 0xc9, 0xde, 0xe8, 0xb6, 0x7d, 0xd1, 0xc3, 0x2e, 0xe8, 0x82, 0xc1, 0x7d, 0x30, 0x45, - 0x38, 0x30, 0x0f, 0xd0, 0xe1, 0x55, 0xd7, 0x14, 0x21, 0x5d, 0x02, 0x7e, 0x21, 0x16, 0x49, 0xc4, - 0x14, 0xbb, 0x74, 0x82, 0xda, 0xcf, 0xbc, 0x84, 0xdf, 0x00, 0x3a, 0x32, 0x0b, 0x02, 0x26, 0xe5, - 0xc0, 0xd2, 0xa6, 0xf5, 0x23, 0xbd, 0x05, 0xfb, 0x0d, 0xe7, 0xf2, 0x5f, 0xe8, 0x09, 0xf4, 0x8f, - 0x4f, 0xe3, 0xe0, 0x7f, 0x84, 0xa1, 0x37, 0xe1, 0xfa, 0xb6, 0x76, 0x65, 0xf7, 0x04, 0x3a, 0x6f, - 0xa6, 0x53, 0xc9, 0x94, 0xc4, 0x37, 0xc0, 0x5a, 0xf0, 0x58, 0x1b, 0x58, 0x6e, 0x71, 0xab, 0x15, - 0x6f, 0xa9, 0x4b, 0x17, 0x8a, 0xb7, 0xa4, 0x73, 0xb0, 0x8e, 0xca, 0x2e, 0x6e, 0xa3, 0x98, 0x7c, - 0x82, 0xef, 0x42, 0x37, 0xf1, 0x52, 0xc5, 0x15, 0x17, 0xb1, 0xc6, 0xaf, 0xb8, 0x7f, 0x04, 0xfc, + // 455 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0x3f, 0x8e, 0xd3, 0x40, + 0x14, 0xc6, 0x3d, 0xb6, 0xd8, 0x90, 0x17, 0x11, 0x96, 0x41, 0x88, 0x28, 0xc0, 0x68, 0x19, 0x24, + 0x58, 0x0a, 0x6c, 0x29, 0xc0, 0x05, 0xa0, 0x40, 0x2c, 0x05, 0xc2, 0xa1, 0xda, 0x06, 0xfc, 0x67, + 0x92, 0x9d, 0xd8, 0xf1, 0x18, 0xcf, 0x18, 0x65, 0x3b, 0x8e, 0xc0, 0x05, 0xe8, 0x39, 0x0a, 0x65, + 0xca, 0x2d, 0x89, 0xd3, 0x50, 0xee, 0x11, 0x50, 0xc6, 0x76, 0xc0, 0x5a, 0x2b, 0x6c, 0x43, 0x65, + 0xfb, 0xf3, 0xcf, 0xef, 0xfb, 0xfc, 0xde, 0x1b, 0x70, 0xd2, 0x68, 0xea, 0xf8, 0xb1, 0x08, 0x22, + 0x3f, 0xe7, 0x71, 0xc8, 0x32, 0x47, 0x9d, 0xa6, 0x4c, 0x3a, 0x69, 0x26, 0x94, 0x68, 0xbc, 0xb0, + 0xb5, 0x84, 0x71, 0x43, 0xd3, 0x30, 0xb5, 0xe1, 0xda, 0x2b, 0xa6, 0x8e, 0x84, 0xef, 0xb2, 0x4f, + 0x39, 0x93, 0x0a, 0xdf, 0x03, 0xa8, 0x88, 0x0f, 0x3c, 0x1c, 0xa0, 0x03, 0x74, 0xd8, 0x75, 0xbb, + 0x95, 0xf2, 0x3a, 0xa4, 0x6f, 0xa0, 0x5f, 0xf3, 0x32, 0x15, 0x89, 0x64, 0xf8, 0x31, 0x58, 0x33, + 0xe1, 0x6b, 0xb2, 0x37, 0xba, 0x6d, 0x5f, 0xf4, 0xb0, 0x37, 0xf4, 0x86, 0xc1, 0x7d, 0x30, 0x45, + 0x34, 0x30, 0x0f, 0xd0, 0xe1, 0x55, 0xd7, 0x14, 0x11, 0x5d, 0x00, 0x7e, 0x29, 0xe6, 0x69, 0xcc, + 0x14, 0xbb, 0x74, 0x82, 0xda, 0xcf, 0xbc, 0x84, 0xdf, 0x00, 0x3a, 0x32, 0x0f, 0x02, 0x26, 0xe5, + 0xc0, 0xd2, 0xa6, 0xf5, 0x23, 0xbd, 0x05, 0x37, 0x1b, 0xce, 0xe5, 0xbf, 0xd0, 0x63, 0xe8, 0x8f, + 0x4f, 0x93, 0xe0, 0x7f, 0x84, 0xa1, 0x37, 0xe0, 0xfa, 0xb6, 0x76, 0x65, 0xf7, 0x04, 0x3a, 0x6f, + 0x27, 0x13, 0xc9, 0x94, 0xc4, 0xfb, 0x60, 0xcd, 0x79, 0xa2, 0x0d, 0x2c, 0x77, 0x73, 0xab, 0x15, + 0x6f, 0xa1, 0x4b, 0x6f, 0x14, 0x6f, 0x41, 0x67, 0x60, 0x1d, 0x95, 0x5d, 0xdc, 0x46, 0x31, 0x79, + 0x88, 0xef, 0x42, 0x37, 0xf5, 0x32, 0xc5, 0x15, 0x17, 0x89, 0xc6, 0xaf, 0xb8, 0x7f, 0x04, 0xfc, 0x1c, 0x3a, 0xa2, 0xf4, 0xd0, 0x3d, 0xe8, 0x8d, 0xee, 0xb4, 0xa5, 0xac, 0x62, 0xb8, 0x35, 0x3b, - 0xfa, 0x66, 0xc2, 0xfe, 0xb8, 0xe0, 0xc6, 0x25, 0x77, 0xcc, 0xd2, 0xcf, 0x3c, 0x60, 0xf8, 0x2d, - 0xec, 0x95, 0xf3, 0xc7, 0xf7, 0xdb, 0xea, 0x34, 0x76, 0x69, 0x48, 0x77, 0x21, 0x55, 0x0f, 0x0c, - 0xfc, 0x01, 0x7a, 0x7f, 0xcd, 0x02, 0x3f, 0x6c, 0xfb, 0xe8, 0xe2, 0x9a, 0x0c, 0x1f, 0xfd, 0x93, - 0xdb, 0x3a, 0xbc, 0x83, 0x4e, 0xd5, 0x7a, 0xdc, 0x1a, 0xa9, 0x39, 0xf3, 0xe1, 0x83, 0x9d, 0x4c, - 0x5d, 0x75, 0x3c, 0x5f, 0xad, 0x89, 0x71, 0xb6, 0x26, 0xc6, 0xf9, 0x9a, 0xa0, 0x2f, 0x39, 0x41, - 0xdf, 0x73, 0x82, 0x7e, 0xe4, 0x04, 0xad, 0x72, 0x82, 0x7e, 0xe6, 0x04, 0xfd, 0xca, 0x89, 0x71, - 0x9e, 0x13, 0xf4, 0x75, 0x43, 0x8c, 0xd5, 0x86, 0x18, 0x67, 0x1b, 0x62, 0x9c, 0x3c, 0x9b, 0x71, - 0xf5, 0x31, 0xf3, 0xed, 0x40, 0x2c, 0x9c, 0x59, 0xea, 0x4d, 0xbd, 0xd8, 0x73, 0x22, 0x11, 0xf2, - 0x9d, 0xa7, 0xd9, 0xdf, 0xd3, 0x97, 0xa7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xbf, 0xda, 0xc9, - 0x32, 0xf4, 0x03, 0x00, 0x00, + 0xfa, 0x66, 0xc2, 0xfe, 0x38, 0x38, 0x61, 0x61, 0x1e, 0xb3, 0x6c, 0xcc, 0xb2, 0xcf, 0x3c, 0x60, + 0xf8, 0x1d, 0xec, 0x95, 0xc3, 0xc7, 0xf7, 0xdb, 0x8a, 0x34, 0x16, 0x69, 0x48, 0x77, 0x21, 0x55, + 0x03, 0x0c, 0xfc, 0x11, 0x7a, 0x7f, 0x0d, 0x02, 0x3f, 0x6c, 0xfb, 0xe8, 0xe2, 0x8e, 0x0c, 0x1f, + 0xfd, 0x93, 0xdb, 0x3a, 0xbc, 0x87, 0x4e, 0xd5, 0x77, 0xdc, 0x1a, 0xa9, 0x39, 0xf0, 0xe1, 0x83, + 0x9d, 0x4c, 0x5d, 0xf5, 0xc5, 0x6c, 0xb9, 0x22, 0xc6, 0xd9, 0x8a, 0x18, 0xe7, 0x2b, 0x82, 0xbe, + 0x14, 0x04, 0x7d, 0x2f, 0x08, 0xfa, 0x51, 0x10, 0xb4, 0x2c, 0x08, 0xfa, 0x59, 0x10, 0xf4, 0xab, + 0x20, 0xc6, 0x79, 0x41, 0xd0, 0xd7, 0x35, 0x31, 0x96, 0x6b, 0x62, 0x9c, 0xad, 0x89, 0x71, 0xfc, + 0x6c, 0xca, 0xd5, 0x49, 0xee, 0xdb, 0x81, 0x98, 0x3b, 0xd3, 0xcc, 0x9b, 0x78, 0x89, 0xe7, 0xc4, + 0x22, 0xe2, 0x3b, 0x8f, 0xb2, 0xbf, 0xa7, 0x2f, 0x4f, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x68, + 0x46, 0x93, 0x30, 0xf1, 0x03, 0x00, 0x00, } func (this *GetJobRequest) Equal(that interface{}) bool { @@ -785,10 +785,10 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// BlockBuilderServiceClient is the client API for BlockBuilderService service. +// SchedulerServiceClient is the client API for SchedulerService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type BlockBuilderServiceClient interface { +type SchedulerServiceClient interface { // GetJob requests a new job from the scheduler GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) // CompleteJob notifies the scheduler that a job has been completed @@ -797,43 +797,43 @@ type BlockBuilderServiceClient interface { SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) } -type blockBuilderServiceClient struct { +type schedulerServiceClient struct { cc *grpc.ClientConn } -func NewBlockBuilderServiceClient(cc *grpc.ClientConn) BlockBuilderServiceClient { - return &blockBuilderServiceClient{cc} +func NewSchedulerServiceClient(cc *grpc.ClientConn) SchedulerServiceClient { + return &schedulerServiceClient{cc} } -func (c *blockBuilderServiceClient) GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) { +func (c *schedulerServiceClient) GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) { out := new(GetJobResponse) - err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/GetJob", in, out, opts...) + err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/GetJob", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *blockBuilderServiceClient) CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) { +func (c *schedulerServiceClient) CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) { out := new(CompleteJobResponse) - err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/CompleteJob", in, out, opts...) + err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/CompleteJob", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *blockBuilderServiceClient) SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) { +func (c *schedulerServiceClient) SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) { out := new(SyncJobResponse) - err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/SyncJob", in, out, opts...) + err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/SyncJob", in, out, opts...) if err != nil { return nil, err } return out, nil } -// BlockBuilderServiceServer is the server API for BlockBuilderService service. -type BlockBuilderServiceServer interface { +// SchedulerServiceServer is the server API for SchedulerService service. +type SchedulerServiceServer interface { // GetJob requests a new job from the scheduler GetJob(context.Context, *GetJobRequest) (*GetJobResponse, error) // CompleteJob notifies the scheduler that a job has been completed @@ -842,93 +842,93 @@ type BlockBuilderServiceServer interface { SyncJob(context.Context, *SyncJobRequest) (*SyncJobResponse, error) } -// UnimplementedBlockBuilderServiceServer can be embedded to have forward compatible implementations. -type UnimplementedBlockBuilderServiceServer struct { +// UnimplementedSchedulerServiceServer can be embedded to have forward compatible implementations. +type UnimplementedSchedulerServiceServer struct { } -func (*UnimplementedBlockBuilderServiceServer) GetJob(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { +func (*UnimplementedSchedulerServiceServer) GetJob(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetJob not implemented") } -func (*UnimplementedBlockBuilderServiceServer) CompleteJob(ctx context.Context, req *CompleteJobRequest) (*CompleteJobResponse, error) { +func (*UnimplementedSchedulerServiceServer) CompleteJob(ctx context.Context, req *CompleteJobRequest) (*CompleteJobResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CompleteJob not implemented") } -func (*UnimplementedBlockBuilderServiceServer) SyncJob(ctx context.Context, req *SyncJobRequest) (*SyncJobResponse, error) { +func (*UnimplementedSchedulerServiceServer) SyncJob(ctx context.Context, req *SyncJobRequest) (*SyncJobResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SyncJob not implemented") } -func RegisterBlockBuilderServiceServer(s *grpc.Server, srv BlockBuilderServiceServer) { - s.RegisterService(&_BlockBuilderService_serviceDesc, srv) +func RegisterSchedulerServiceServer(s *grpc.Server, srv SchedulerServiceServer) { + s.RegisterService(&_SchedulerService_serviceDesc, srv) } -func _BlockBuilderService_GetJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _SchedulerService_GetJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetJobRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(BlockBuilderServiceServer).GetJob(ctx, in) + return srv.(SchedulerServiceServer).GetJob(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/blockbuilder.types.BlockBuilderService/GetJob", + FullMethod: "/blockbuilder.types.SchedulerService/GetJob", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BlockBuilderServiceServer).GetJob(ctx, req.(*GetJobRequest)) + return srv.(SchedulerServiceServer).GetJob(ctx, req.(*GetJobRequest)) } return interceptor(ctx, in, info, handler) } -func _BlockBuilderService_CompleteJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _SchedulerService_CompleteJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CompleteJobRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(BlockBuilderServiceServer).CompleteJob(ctx, in) + return srv.(SchedulerServiceServer).CompleteJob(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/blockbuilder.types.BlockBuilderService/CompleteJob", + FullMethod: "/blockbuilder.types.SchedulerService/CompleteJob", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BlockBuilderServiceServer).CompleteJob(ctx, req.(*CompleteJobRequest)) + return srv.(SchedulerServiceServer).CompleteJob(ctx, req.(*CompleteJobRequest)) } return interceptor(ctx, in, info, handler) } -func _BlockBuilderService_SyncJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _SchedulerService_SyncJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SyncJobRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(BlockBuilderServiceServer).SyncJob(ctx, in) + return srv.(SchedulerServiceServer).SyncJob(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/blockbuilder.types.BlockBuilderService/SyncJob", + FullMethod: "/blockbuilder.types.SchedulerService/SyncJob", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BlockBuilderServiceServer).SyncJob(ctx, req.(*SyncJobRequest)) + return srv.(SchedulerServiceServer).SyncJob(ctx, req.(*SyncJobRequest)) } return interceptor(ctx, in, info, handler) } -var _BlockBuilderService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "blockbuilder.types.BlockBuilderService", - HandlerType: (*BlockBuilderServiceServer)(nil), +var _SchedulerService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "blockbuilder.types.SchedulerService", + HandlerType: (*SchedulerServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "GetJob", - Handler: _BlockBuilderService_GetJob_Handler, + Handler: _SchedulerService_GetJob_Handler, }, { MethodName: "CompleteJob", - Handler: _BlockBuilderService_CompleteJob_Handler, + Handler: _SchedulerService_CompleteJob_Handler, }, { MethodName: "SyncJob", - Handler: _BlockBuilderService_SyncJob_Handler, + Handler: _SchedulerService_SyncJob_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/pkg/blockbuilder/types/proto/blockbuilder.proto b/pkg/blockbuilder/types/proto/blockbuilder.proto index a29b03dc61..607cfc1666 100644 --- a/pkg/blockbuilder/types/proto/blockbuilder.proto +++ b/pkg/blockbuilder/types/proto/blockbuilder.proto @@ -4,8 +4,8 @@ package blockbuilder.types; option go_package = "github.com/grafana/loki/pkg/blockbuilder/types/proto"; -// BlockBuilderService defines the gRPC service for block builder communication -service BlockBuilderService { +// SchedulerService defines the gRPC service for calls originating from the blockbuilder (to the scheduler) +service SchedulerService { // GetJob requests a new job from the scheduler rpc GetJob(GetJobRequest) returns (GetJobResponse) {} // CompleteJob notifies the scheduler that a job has been completed diff --git a/pkg/blockbuilder/types/scheduler_server.go b/pkg/blockbuilder/types/scheduler_server.go new file mode 100644 index 0000000000..c275690385 --- /dev/null +++ b/pkg/blockbuilder/types/scheduler_server.go @@ -0,0 +1,54 @@ +package types + +import ( + "context" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" +) + +// schedulerServer implements proto.SchedulerServiceServer by delegating to a SchedulerHandler +type schedulerServer struct { + handler SchedulerHandler +} + +// NewSchedulerServer creates a new gRPC server that delegates to the provided handler +func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer { + return &schedulerServer{handler: handler} +} + +// GetJob implements proto.SchedulerServiceServer +func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { + job, ok, err := s.handler.HandleGetJob(ctx, req.BuilderId) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if !ok { + return &proto.GetJobResponse{ + Job: nil, + Ok: false, + }, nil + } + return &proto.GetJobResponse{ + Job: jobToProto(job), + Ok: true, + }, nil +} + +// CompleteJob implements proto.SchedulerServiceServer +func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) { + if err := s.handler.HandleCompleteJob(ctx, req.BuilderId, protoToJob(req.Job), req.Success); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &proto.CompleteJobResponse{}, nil +} + +// SyncJob implements proto.SchedulerServiceServer +func (s *schedulerServer) SyncJob(ctx context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) { + if err := s.handler.HandleSyncJob(ctx, req.BuilderId, protoToJob(req.Job)); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &proto.SyncJobResponse{}, nil +} diff --git a/pkg/blockbuilder/types/transport.go b/pkg/blockbuilder/types/transport.go index 5659ffb48a..6f7dc41e39 100644 --- a/pkg/blockbuilder/types/transport.go +++ b/pkg/blockbuilder/types/transport.go @@ -5,8 +5,8 @@ import ( ) var ( - _ Transport = unimplementedTransport{} - _ Transport = &MemoryTransport{} + _ BuilderTransport = unimplementedTransport{} + _ BuilderTransport = &MemoryTransport{} ) // unimplementedTransport provides default implementations that panic @@ -26,11 +26,11 @@ func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *SyncJobRequest // MemoryTransport implements Transport interface for in-memory communication type MemoryTransport struct { - scheduler Scheduler + scheduler SchedulerHandler } // NewMemoryTransport creates a new in-memory transport instance -func NewMemoryTransport(scheduler Scheduler) *MemoryTransport { +func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport { return &MemoryTransport{ scheduler: scheduler, } @@ -48,7 +48,7 @@ func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequ } func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error { - return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job) + return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job, req.Success) } func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0deb548dea..395a891e62 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" + blocktypes "github.com/grafana/loki/v3/pkg/blockbuilder/types" blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" @@ -1862,8 +1863,19 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { if err != nil { return nil, fmt.Errorf("creating kafka offset manager: %w", err) } - s := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetManager, logger, prometheus.DefaultRegisterer) - blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s) + + s := blockscheduler.NewScheduler( + t.Cfg.BlockScheduler, + blockscheduler.NewJobQueue(), + offsetManager, + logger, + prometheus.DefaultRegisterer, + ) + + blockprotos.RegisterSchedulerServiceServer( + t.Server.GRPC, + blocktypes.NewSchedulerServer(s), + ) return s, nil }