refactor(blockbuilder): transport splitting (#15315)

pull/15340/head
Owen Diehl 7 months ago committed by GitHub
parent 71264415f7
commit 1f02adf90f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 135
      pkg/blockbuilder/architecture.md
  2. 67
      pkg/blockbuilder/builder/worker.go
  3. 45
      pkg/blockbuilder/scheduler/scheduler.go
  4. 51
      pkg/blockbuilder/scheduler/scheduler_test.go
  5. 10
      pkg/blockbuilder/types/grpc_transport.go
  6. 39
      pkg/blockbuilder/types/interfaces.go
  7. 134
      pkg/blockbuilder/types/proto/blockbuilder.pb.go
  8. 4
      pkg/blockbuilder/types/proto/blockbuilder.proto
  9. 54
      pkg/blockbuilder/types/scheduler_server.go
  10. 10
      pkg/blockbuilder/types/transport.go
  11. 16
      pkg/loki/modules.go

@ -10,22 +10,27 @@ The Block Builder system is organized into three main packages:
### pkg/blockbuilder/types
- Contains shared type definitions and interfaces
- Defines core data structures like `Job` and `Offsets`
- Defines core data structures like `Job` and `JobStatus`
- Provides interface definitions for:
- `Worker`: Interface for processing jobs and reporting status
- `Scheduler`: Interface for job scheduling and worker management
- `Transport`: Interface for communication between components
- `BuilderTransport`: Interface for builder-to-scheduler communication
- `SchedulerHandler`: Interface for scheduler business logic
The transport layer is split into client and server components:
- Client side uses `BuilderTransport` to abstract gRPC details
- Server side uses `SchedulerHandler` for pure business logic
- A gRPC adapter connects the two sides
### pkg/blockbuilder/scheduler
- Implements the job queue and scheduling logic
- Manages job distribution to block builders
- Tracks job progress and ensures exactly-once processing
- Handles job state management and offset tracking
- Implements `SchedulerHandler` interface for business logic
- Uses gRPC adapter to expose services to builders
### pkg/blockbuilder/builder
- Implements the block builder worker functionality
- Implements the block builder functionality
- Uses `BuilderTransport` to communicate with scheduler
- Processes assigned jobs and builds storage formats
- Manages transport layer communication
- Handles data processing and object storage interactions
## Component Diagram
@ -42,7 +47,8 @@ graph TB
PC[Partition Controller]
subgraph Transport Layer
T[gRPC/Transport Interface]
GA[gRPC Adapter]
SH[Scheduler Handler]
end
end
@ -50,6 +56,10 @@ graph TB
BB1[Block Builder 1]
BB2[Block Builder 2]
BB3[Block Builder N]
subgraph Builder Transport
BT[Builder Transport]
end
end
subgraph Storage
@ -59,10 +69,12 @@ graph TB
KP --> PC
PC --> S
S <--> Q
S <--> T
T <--> BB1
T <--> BB2
T <--> BB3
S --> SH
SH <--> GA
GA <--> BT
BT <--> BB1
BT <--> BB2
BT <--> BB3
BB1 --> OS
BB2 --> OS
BB3 --> OS
@ -73,9 +85,10 @@ graph TB
```mermaid
sequenceDiagram
participant PC as Partition Controller
participant S as Block Scheduler
participant Q as Queue
participant T as Transport
participant S as Scheduler
participant SH as SchedulerHandler
participant GA as gRPC Adapter
participant BT as Builder Transport
participant BB as Block Builder
participant OS as Object Storage
@ -85,57 +98,53 @@ sequenceDiagram
S->>Q: Enqueue Job
end
BB->>T: Request Job
T->>S: Forward Request
S->>Q: Dequeue Job
Q-->>S: Return Job (or empty)
BB->>BT: Request Job
BT->>GA: gRPC GetJob Request
GA->>SH: HandleGetJob
SH->>S: Get Job from Queue
S-->>SH: Return Job (or empty)
alt Has Job
S->>T: Send Job
T->>BB: Forward Job
SH-->>GA: Return Job
GA-->>BT: gRPC Response
BT-->>BB: Return Job
BB->>OS: Process & Write Data
BB->>T: Report Success
T->>S: Forward Status
BB->>BT: Report Success
BT->>GA: gRPC CompleteJob
GA->>SH: HandleCompleteJob
SH->>S: Mark Complete
S->>PC: Commit Offset
else No Job
S->>T: Send No Job Available
T->>BB: Forward Response
SH-->>GA: Return No Job
GA-->>BT: gRPC Response
BT-->>BB: Return No Job
end
```
## Core Components
### Job and Offsets
- `Job`: Represents a unit of work for processing Kafka data
- Contains a partition ID and an offset range
- Immutable data structure that can be safely passed between components
- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process
- Used to track progress and ensure exactly-once processing
### Block Scheduler
- Central component responsible for:
- Managing the job queue
- Coordinating Block Builder assignments
- Tracking job progress
- Implements a pull-based model where Block Builders request jobs
- Decoupled from specific transport mechanisms through the Transport interface
### Block Builder
- Processes jobs assigned by the Block Scheduler
- Responsible for:
- Building storage formats from Kafka data
- Writing completed blocks to object storage
- Reporting job status back to scheduler
- Implements the Worker interface for job processing
### Transport Layer
- Provides communication between Block Builders and Scheduler
- Abstracts transport mechanism (currently in-memory & gRPC)
- Defines message types for:
- Job requests
- Job completion notifications
- Job synchronization
## Design Principles
## Interface Design
The system uses a layered interface approach:
1. **Builder Side**:
- Simple API for job processing
- `BuilderTransport`: Handles communication details
- Builders work with domain types, unaware of gRPC
2. **Transport Layer**:
- gRPC service definitions in proto files
- Adapter pattern to convert between proto and domain types
- Clear separation between transport and business logic
3. **Scheduler Side**:
- `SchedulerHandler`: Pure business logic interface
- No knowledge of transport details
- Clean separation of concerns
This design allows for:
- Easy testing of each layer independently
- Flexibility to change transport mechanism
- Clear separation between business logic and communication
- Type-safe conversions between proto and domain types
### Decoupled I/O
- Business logic is separated from I/O operations
@ -150,10 +159,4 @@ sequenceDiagram
### Pull-Based Architecture
- Block Builders pull jobs when ready
- Natural load balancing
- Prevents overloading of workers
### Interface-Driven Development
- Core components defined by interfaces
- Allows for multiple implementations
- Facilitates testing and modularity
- Prevents overloading of workers

@ -1,67 +0,0 @@
package builder
import (
"context"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)
var (
_ types.Worker = unimplementedWorker{}
_ types.Worker = &Worker{}
)
// unimplementedWorker provides default implementations for the Worker interface.
type unimplementedWorker struct{}
func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) {
panic("unimplemented")
}
func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job, _ bool) error {
panic("unimplemented")
}
func (u unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error {
panic("unimplemented")
}
// Worker is the implementation of the Worker interface.
type Worker struct {
unimplementedWorker
transport types.Transport
builderID string
}
// NewWorker creates a new Worker instance.
func NewWorker(builderID string, transport types.Transport) *Worker {
return &Worker{
transport: transport,
builderID: builderID,
}
}
func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) {
resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: w.builderID,
})
if err != nil {
return nil, false, err
}
return resp.Job, resp.OK, nil
}
func (w *Worker) CompleteJob(ctx context.Context, job *types.Job, success bool) error {
return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: w.builderID,
Job: job,
Success: success,
})
}
func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error {
return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{
BuilderID: w.builderID,
Job: job,
})
}

@ -16,12 +16,11 @@ import (
"github.com/twmb/franz-go/pkg/kadm"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/kafka/partition"
)
var (
_ types.Scheduler = &BlockScheduler{}
_ types.SchedulerHandler = &BlockScheduler{}
)
type Config struct {
@ -175,7 +174,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
}
}
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job) error {
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error {
// TODO: handle commits
s.queue.MarkComplete(job.ID)
return nil
@ -185,43 +184,3 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
s.queue.SyncJob(job.ID, builderID, job)
return nil
}
func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
s.queue.MarkComplete(req.Job.Id)
return &proto.CompleteJobResponse{}, nil
}
func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
ID: req.Job.Id,
Partition: req.Job.Partition,
Offsets: types.Offsets{
Min: req.Job.Offsets.Min,
Max: req.Job.Offsets.Max,
},
})
return &proto.SyncJobResponse{}, nil
}
func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
var resp proto.GetJobResponse
job, ok, err := s.queue.Dequeue(req.BuilderId)
if err != nil {
return &resp, err
}
if ok {
resp.Ok = true
resp.Job = &proto.Job{
Id: job.ID,
Partition: job.Partition,
Offsets: &proto.Offsets{
Min: job.Offsets.Min,
Max: job.Offsets.Max,
},
}
}
return &resp, nil
}

@ -8,7 +8,6 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/blockbuilder/builder"
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)
@ -16,14 +15,14 @@ type testEnv struct {
queue *JobQueue
scheduler *BlockScheduler
transport *types.MemoryTransport
builder *builder.Worker
builder *Worker
}
func newTestEnv(builderID string) *testEnv {
queue := NewJobQueue()
scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry())
transport := types.NewMemoryTransport(scheduler)
builder := builder.NewWorker(builderID, transport)
builder := NewWorker(builderID, transport)
return &testEnv{
queue: queue,
@ -89,7 +88,7 @@ func TestMultipleBuilders(t *testing.T) {
// Create first environment
env1 := newTestEnv("test-builder-1")
// Create second builder using same scheduler
builder2 := builder.NewWorker("test-builder-2", env1.transport)
builder2 := NewWorker("test-builder-2", env1.transport)
ctx := context.Background()
@ -250,3 +249,47 @@ func TestConfig_Validate(t *testing.T) {
})
}
}
// Worker handles communication with the scheduler service.
type Worker struct {
transport types.BuilderTransport
builderID string
}
// NewWorker creates a new Worker instance.
func NewWorker(builderID string, transport types.BuilderTransport) *Worker {
return &Worker{
transport: transport,
builderID: builderID,
}
}
// GetJob requests a new job from the scheduler.
func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) {
resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: w.builderID,
})
if err != nil {
return nil, false, err
}
return resp.Job, resp.OK, nil
}
// CompleteJob marks a job as finished.
func (w *Worker) CompleteJob(ctx context.Context, job *types.Job, success bool) error {
err := w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: w.builderID,
Job: job,
Success: success,
})
return err
}
// SyncJob informs the scheduler about an in-progress job.
func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error {
err := w.transport.SendSyncJob(ctx, &types.SyncJobRequest{
BuilderID: w.builderID,
Job: job,
})
return err
}

@ -16,7 +16,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/constants"
)
var _ Transport = &GRPCTransport{}
var _ BuilderTransport = &GRPCTransport{}
type grpcTransportMetrics struct {
requestLatency *prometheus.HistogramVec
@ -38,7 +38,7 @@ func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMet
type GRPCTransport struct {
grpc_health_v1.HealthClient
io.Closer
proto.BlockBuilderServiceClient
proto.SchedulerServiceClient
}
// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options
@ -58,9 +58,9 @@ func NewGRPCTransportFromAddress(
}
return &GRPCTransport{
Closer: conn,
HealthClient: grpc_health_v1.NewHealthClient(conn),
BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn),
Closer: conn,
HealthClient: grpc_health_v1.NewHealthClient(conn),
SchedulerServiceClient: proto.NewSchedulerServiceClient(conn),
}, nil
}

@ -2,35 +2,6 @@ package types
import "context"
// Worker interface defines the methods for processing jobs and reporting status.
type Worker interface {
// GetJob requests a new job from the scheduler
GetJob(ctx context.Context) (*Job, bool, error)
// CompleteJob marks a job as finished
CompleteJob(ctx context.Context, job *Job, success bool) error
// SyncJob informs the scheduler about an in-progress job
SyncJob(ctx context.Context, job *Job) error
}
// Scheduler interface defines the methods for scheduling jobs and managing worker pools.
type Scheduler interface {
// HandleGetJob processes a job request from a block builder
HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error)
// HandleCompleteJob processes a job completion notification
HandleCompleteJob(ctx context.Context, builderID string, job *Job) error
// HandleSyncJob processes a job sync request
HandleSyncJob(ctx context.Context, builderID string, job *Job) error
}
// Transport defines the interface for communication between block builders and scheduler
type Transport interface {
BuilderTransport
SchedulerTransport
}
// SchedulerTransport is for calls originating from the scheduler
type SchedulerTransport interface{}
// BuilderTransport is for calls originating from the builder
type BuilderTransport interface {
// SendGetJobRequest sends a request to get a new job
@ -41,6 +12,16 @@ type BuilderTransport interface {
SendSyncJob(ctx context.Context, req *SyncJobRequest) error
}
// SchedulerHandler defines the business logic for handling builder requests
type SchedulerHandler interface {
// HandleGetJob processes a request for a new job
HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error)
// HandleCompleteJob processes a job completion notification
HandleCompleteJob(ctx context.Context, builderID string, job *Job, success bool) error
// HandleSyncJob processes a job sync request
HandleSyncJob(ctx context.Context, builderID string, job *Job) error
}
// Request/Response message types
type GetJobRequest struct {
BuilderID string

@ -436,36 +436,36 @@ func init() {
}
var fileDescriptor_04968622516f7b79 = []byte{
// 453 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0xc1, 0x8e, 0x93, 0x40,
0x18, 0xc7, 0x19, 0x88, 0x5b, 0xfb, 0x35, 0x56, 0x9d, 0x8d, 0xb1, 0xa9, 0x3a, 0x59, 0xc7, 0x44,
0xd7, 0x83, 0x90, 0x54, 0x7d, 0x81, 0x7a, 0x30, 0xae, 0x07, 0x23, 0xeb, 0x69, 0x2f, 0x0a, 0x74,
0x5a, 0xa7, 0x50, 0x06, 0x99, 0xc1, 0x74, 0x6f, 0x3e, 0x82, 0x8f, 0xe0, 0xd1, 0x47, 0xf1, 0xd8,
0xe3, 0x1e, 0x2d, 0xbd, 0x78, 0xdc, 0x47, 0x30, 0x0c, 0x50, 0x25, 0x4b, 0xea, 0x5e, 0x3c, 0x01,
0x7f, 0x7e, 0x7c, 0xff, 0x3f, 0xdf, 0xf7, 0x0d, 0x38, 0x49, 0x38, 0x73, 0xfc, 0x48, 0x04, 0xa1,
0x9f, 0xf1, 0x68, 0xc2, 0x52, 0x47, 0x9d, 0x26, 0x4c, 0x3a, 0x49, 0x2a, 0x94, 0x68, 0xbc, 0xb0,
0xb5, 0x84, 0x71, 0x43, 0xd3, 0x30, 0xb5, 0xe1, 0xda, 0x4b, 0xa6, 0x8e, 0x84, 0xef, 0xb2, 0x4f,
0x19, 0x93, 0x0a, 0xdf, 0x03, 0xa8, 0x88, 0xf7, 0x7c, 0x32, 0x40, 0x07, 0xe8, 0xb0, 0xeb, 0x76,
0x2b, 0xe5, 0xd5, 0x84, 0xbe, 0x86, 0x7e, 0xcd, 0xcb, 0x44, 0xc4, 0x92, 0xe1, 0xc7, 0x60, 0xcd,
0x85, 0xaf, 0xc9, 0xde, 0xe8, 0xb6, 0x7d, 0xd1, 0xc3, 0x2e, 0xe8, 0x82, 0xc1, 0x7d, 0x30, 0x45,
0x38, 0x30, 0x0f, 0xd0, 0xe1, 0x55, 0xd7, 0x14, 0x21, 0x5d, 0x02, 0x7e, 0x21, 0x16, 0x49, 0xc4,
0x14, 0xbb, 0x74, 0x82, 0xda, 0xcf, 0xbc, 0x84, 0xdf, 0x00, 0x3a, 0x32, 0x0b, 0x02, 0x26, 0xe5,
0xc0, 0xd2, 0xa6, 0xf5, 0x23, 0xbd, 0x05, 0xfb, 0x0d, 0xe7, 0xf2, 0x5f, 0xe8, 0x09, 0xf4, 0x8f,
0x4f, 0xe3, 0xe0, 0x7f, 0x84, 0xa1, 0x37, 0xe1, 0xfa, 0xb6, 0x76, 0x65, 0xf7, 0x04, 0x3a, 0x6f,
0xa6, 0x53, 0xc9, 0x94, 0xc4, 0x37, 0xc0, 0x5a, 0xf0, 0x58, 0x1b, 0x58, 0x6e, 0x71, 0xab, 0x15,
0x6f, 0xa9, 0x4b, 0x17, 0x8a, 0xb7, 0xa4, 0x73, 0xb0, 0x8e, 0xca, 0x2e, 0x6e, 0xa3, 0x98, 0x7c,
0x82, 0xef, 0x42, 0x37, 0xf1, 0x52, 0xc5, 0x15, 0x17, 0xb1, 0xc6, 0xaf, 0xb8, 0x7f, 0x04, 0xfc,
// 455 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0x3f, 0x8e, 0xd3, 0x40,
0x14, 0xc6, 0x3d, 0xb6, 0xd8, 0x90, 0x17, 0x11, 0x96, 0x41, 0x88, 0x28, 0xc0, 0x68, 0x19, 0x24,
0x58, 0x0a, 0x6c, 0x29, 0xc0, 0x05, 0xa0, 0x40, 0x2c, 0x05, 0xc2, 0xa1, 0xda, 0x06, 0xfc, 0x67,
0x92, 0x9d, 0xd8, 0xf1, 0x18, 0xcf, 0x18, 0x65, 0x3b, 0x8e, 0xc0, 0x05, 0xe8, 0x39, 0x0a, 0x65,
0xca, 0x2d, 0x89, 0xd3, 0x50, 0xee, 0x11, 0x50, 0xc6, 0x76, 0xc0, 0x5a, 0x2b, 0x6c, 0x43, 0x65,
0xfb, 0xf3, 0xcf, 0xef, 0xfb, 0xfc, 0xde, 0x1b, 0x70, 0xd2, 0x68, 0xea, 0xf8, 0xb1, 0x08, 0x22,
0x3f, 0xe7, 0x71, 0xc8, 0x32, 0x47, 0x9d, 0xa6, 0x4c, 0x3a, 0x69, 0x26, 0x94, 0x68, 0xbc, 0xb0,
0xb5, 0x84, 0x71, 0x43, 0xd3, 0x30, 0xb5, 0xe1, 0xda, 0x2b, 0xa6, 0x8e, 0x84, 0xef, 0xb2, 0x4f,
0x39, 0x93, 0x0a, 0xdf, 0x03, 0xa8, 0x88, 0x0f, 0x3c, 0x1c, 0xa0, 0x03, 0x74, 0xd8, 0x75, 0xbb,
0x95, 0xf2, 0x3a, 0xa4, 0x6f, 0xa0, 0x5f, 0xf3, 0x32, 0x15, 0x89, 0x64, 0xf8, 0x31, 0x58, 0x33,
0xe1, 0x6b, 0xb2, 0x37, 0xba, 0x6d, 0x5f, 0xf4, 0xb0, 0x37, 0xf4, 0x86, 0xc1, 0x7d, 0x30, 0x45,
0x34, 0x30, 0x0f, 0xd0, 0xe1, 0x55, 0xd7, 0x14, 0x11, 0x5d, 0x00, 0x7e, 0x29, 0xe6, 0x69, 0xcc,
0x14, 0xbb, 0x74, 0x82, 0xda, 0xcf, 0xbc, 0x84, 0xdf, 0x00, 0x3a, 0x32, 0x0f, 0x02, 0x26, 0xe5,
0xc0, 0xd2, 0xa6, 0xf5, 0x23, 0xbd, 0x05, 0x37, 0x1b, 0xce, 0xe5, 0xbf, 0xd0, 0x63, 0xe8, 0x8f,
0x4f, 0x93, 0xe0, 0x7f, 0x84, 0xa1, 0x37, 0xe0, 0xfa, 0xb6, 0x76, 0x65, 0xf7, 0x04, 0x3a, 0x6f,
0x27, 0x13, 0xc9, 0x94, 0xc4, 0xfb, 0x60, 0xcd, 0x79, 0xa2, 0x0d, 0x2c, 0x77, 0x73, 0xab, 0x15,
0x6f, 0xa1, 0x4b, 0x6f, 0x14, 0x6f, 0x41, 0x67, 0x60, 0x1d, 0x95, 0x5d, 0xdc, 0x46, 0x31, 0x79,
0x88, 0xef, 0x42, 0x37, 0xf5, 0x32, 0xc5, 0x15, 0x17, 0x89, 0xc6, 0xaf, 0xb8, 0x7f, 0x04, 0xfc,
0x1c, 0x3a, 0xa2, 0xf4, 0xd0, 0x3d, 0xe8, 0x8d, 0xee, 0xb4, 0xa5, 0xac, 0x62, 0xb8, 0x35, 0x3b,
0xfa, 0x66, 0xc2, 0xfe, 0xb8, 0xe0, 0xc6, 0x25, 0x77, 0xcc, 0xd2, 0xcf, 0x3c, 0x60, 0xf8, 0x2d,
0xec, 0x95, 0xf3, 0xc7, 0xf7, 0xdb, 0xea, 0x34, 0x76, 0x69, 0x48, 0x77, 0x21, 0x55, 0x0f, 0x0c,
0xfc, 0x01, 0x7a, 0x7f, 0xcd, 0x02, 0x3f, 0x6c, 0xfb, 0xe8, 0xe2, 0x9a, 0x0c, 0x1f, 0xfd, 0x93,
0xdb, 0x3a, 0xbc, 0x83, 0x4e, 0xd5, 0x7a, 0xdc, 0x1a, 0xa9, 0x39, 0xf3, 0xe1, 0x83, 0x9d, 0x4c,
0x5d, 0x75, 0x3c, 0x5f, 0xad, 0x89, 0x71, 0xb6, 0x26, 0xc6, 0xf9, 0x9a, 0xa0, 0x2f, 0x39, 0x41,
0xdf, 0x73, 0x82, 0x7e, 0xe4, 0x04, 0xad, 0x72, 0x82, 0x7e, 0xe6, 0x04, 0xfd, 0xca, 0x89, 0x71,
0x9e, 0x13, 0xf4, 0x75, 0x43, 0x8c, 0xd5, 0x86, 0x18, 0x67, 0x1b, 0x62, 0x9c, 0x3c, 0x9b, 0x71,
0xf5, 0x31, 0xf3, 0xed, 0x40, 0x2c, 0x9c, 0x59, 0xea, 0x4d, 0xbd, 0xd8, 0x73, 0x22, 0x11, 0xf2,
0x9d, 0xa7, 0xd9, 0xdf, 0xd3, 0x97, 0xa7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xbf, 0xda, 0xc9,
0x32, 0xf4, 0x03, 0x00, 0x00,
0xfa, 0x66, 0xc2, 0xfe, 0x38, 0x38, 0x61, 0x61, 0x1e, 0xb3, 0x6c, 0xcc, 0xb2, 0xcf, 0x3c, 0x60,
0xf8, 0x1d, 0xec, 0x95, 0xc3, 0xc7, 0xf7, 0xdb, 0x8a, 0x34, 0x16, 0x69, 0x48, 0x77, 0x21, 0x55,
0x03, 0x0c, 0xfc, 0x11, 0x7a, 0x7f, 0x0d, 0x02, 0x3f, 0x6c, 0xfb, 0xe8, 0xe2, 0x8e, 0x0c, 0x1f,
0xfd, 0x93, 0xdb, 0x3a, 0xbc, 0x87, 0x4e, 0xd5, 0x77, 0xdc, 0x1a, 0xa9, 0x39, 0xf0, 0xe1, 0x83,
0x9d, 0x4c, 0x5d, 0xf5, 0xc5, 0x6c, 0xb9, 0x22, 0xc6, 0xd9, 0x8a, 0x18, 0xe7, 0x2b, 0x82, 0xbe,
0x14, 0x04, 0x7d, 0x2f, 0x08, 0xfa, 0x51, 0x10, 0xb4, 0x2c, 0x08, 0xfa, 0x59, 0x10, 0xf4, 0xab,
0x20, 0xc6, 0x79, 0x41, 0xd0, 0xd7, 0x35, 0x31, 0x96, 0x6b, 0x62, 0x9c, 0xad, 0x89, 0x71, 0xfc,
0x6c, 0xca, 0xd5, 0x49, 0xee, 0xdb, 0x81, 0x98, 0x3b, 0xd3, 0xcc, 0x9b, 0x78, 0x89, 0xe7, 0xc4,
0x22, 0xe2, 0x3b, 0x8f, 0xb2, 0xbf, 0xa7, 0x2f, 0x4f, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x68,
0x46, 0x93, 0x30, 0xf1, 0x03, 0x00, 0x00,
}
func (this *GetJobRequest) Equal(that interface{}) bool {
@ -785,10 +785,10 @@ var _ grpc.ClientConn
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// BlockBuilderServiceClient is the client API for BlockBuilderService service.
// SchedulerServiceClient is the client API for SchedulerService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type BlockBuilderServiceClient interface {
type SchedulerServiceClient interface {
// GetJob requests a new job from the scheduler
GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error)
// CompleteJob notifies the scheduler that a job has been completed
@ -797,43 +797,43 @@ type BlockBuilderServiceClient interface {
SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error)
}
type blockBuilderServiceClient struct {
type schedulerServiceClient struct {
cc *grpc.ClientConn
}
func NewBlockBuilderServiceClient(cc *grpc.ClientConn) BlockBuilderServiceClient {
return &blockBuilderServiceClient{cc}
func NewSchedulerServiceClient(cc *grpc.ClientConn) SchedulerServiceClient {
return &schedulerServiceClient{cc}
}
func (c *blockBuilderServiceClient) GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) {
func (c *schedulerServiceClient) GetJob(ctx context.Context, in *GetJobRequest, opts ...grpc.CallOption) (*GetJobResponse, error) {
out := new(GetJobResponse)
err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/GetJob", in, out, opts...)
err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/GetJob", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *blockBuilderServiceClient) CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) {
func (c *schedulerServiceClient) CompleteJob(ctx context.Context, in *CompleteJobRequest, opts ...grpc.CallOption) (*CompleteJobResponse, error) {
out := new(CompleteJobResponse)
err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/CompleteJob", in, out, opts...)
err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/CompleteJob", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *blockBuilderServiceClient) SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) {
func (c *schedulerServiceClient) SyncJob(ctx context.Context, in *SyncJobRequest, opts ...grpc.CallOption) (*SyncJobResponse, error) {
out := new(SyncJobResponse)
err := c.cc.Invoke(ctx, "/blockbuilder.types.BlockBuilderService/SyncJob", in, out, opts...)
err := c.cc.Invoke(ctx, "/blockbuilder.types.SchedulerService/SyncJob", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BlockBuilderServiceServer is the server API for BlockBuilderService service.
type BlockBuilderServiceServer interface {
// SchedulerServiceServer is the server API for SchedulerService service.
type SchedulerServiceServer interface {
// GetJob requests a new job from the scheduler
GetJob(context.Context, *GetJobRequest) (*GetJobResponse, error)
// CompleteJob notifies the scheduler that a job has been completed
@ -842,93 +842,93 @@ type BlockBuilderServiceServer interface {
SyncJob(context.Context, *SyncJobRequest) (*SyncJobResponse, error)
}
// UnimplementedBlockBuilderServiceServer can be embedded to have forward compatible implementations.
type UnimplementedBlockBuilderServiceServer struct {
// UnimplementedSchedulerServiceServer can be embedded to have forward compatible implementations.
type UnimplementedSchedulerServiceServer struct {
}
func (*UnimplementedBlockBuilderServiceServer) GetJob(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
func (*UnimplementedSchedulerServiceServer) GetJob(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetJob not implemented")
}
func (*UnimplementedBlockBuilderServiceServer) CompleteJob(ctx context.Context, req *CompleteJobRequest) (*CompleteJobResponse, error) {
func (*UnimplementedSchedulerServiceServer) CompleteJob(ctx context.Context, req *CompleteJobRequest) (*CompleteJobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CompleteJob not implemented")
}
func (*UnimplementedBlockBuilderServiceServer) SyncJob(ctx context.Context, req *SyncJobRequest) (*SyncJobResponse, error) {
func (*UnimplementedSchedulerServiceServer) SyncJob(ctx context.Context, req *SyncJobRequest) (*SyncJobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SyncJob not implemented")
}
func RegisterBlockBuilderServiceServer(s *grpc.Server, srv BlockBuilderServiceServer) {
s.RegisterService(&_BlockBuilderService_serviceDesc, srv)
func RegisterSchedulerServiceServer(s *grpc.Server, srv SchedulerServiceServer) {
s.RegisterService(&_SchedulerService_serviceDesc, srv)
}
func _BlockBuilderService_GetJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _SchedulerService_GetJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetJobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BlockBuilderServiceServer).GetJob(ctx, in)
return srv.(SchedulerServiceServer).GetJob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/blockbuilder.types.BlockBuilderService/GetJob",
FullMethod: "/blockbuilder.types.SchedulerService/GetJob",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BlockBuilderServiceServer).GetJob(ctx, req.(*GetJobRequest))
return srv.(SchedulerServiceServer).GetJob(ctx, req.(*GetJobRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BlockBuilderService_CompleteJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _SchedulerService_CompleteJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CompleteJobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BlockBuilderServiceServer).CompleteJob(ctx, in)
return srv.(SchedulerServiceServer).CompleteJob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/blockbuilder.types.BlockBuilderService/CompleteJob",
FullMethod: "/blockbuilder.types.SchedulerService/CompleteJob",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BlockBuilderServiceServer).CompleteJob(ctx, req.(*CompleteJobRequest))
return srv.(SchedulerServiceServer).CompleteJob(ctx, req.(*CompleteJobRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BlockBuilderService_SyncJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
func _SchedulerService_SyncJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SyncJobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BlockBuilderServiceServer).SyncJob(ctx, in)
return srv.(SchedulerServiceServer).SyncJob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/blockbuilder.types.BlockBuilderService/SyncJob",
FullMethod: "/blockbuilder.types.SchedulerService/SyncJob",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BlockBuilderServiceServer).SyncJob(ctx, req.(*SyncJobRequest))
return srv.(SchedulerServiceServer).SyncJob(ctx, req.(*SyncJobRequest))
}
return interceptor(ctx, in, info, handler)
}
var _BlockBuilderService_serviceDesc = grpc.ServiceDesc{
ServiceName: "blockbuilder.types.BlockBuilderService",
HandlerType: (*BlockBuilderServiceServer)(nil),
var _SchedulerService_serviceDesc = grpc.ServiceDesc{
ServiceName: "blockbuilder.types.SchedulerService",
HandlerType: (*SchedulerServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetJob",
Handler: _BlockBuilderService_GetJob_Handler,
Handler: _SchedulerService_GetJob_Handler,
},
{
MethodName: "CompleteJob",
Handler: _BlockBuilderService_CompleteJob_Handler,
Handler: _SchedulerService_CompleteJob_Handler,
},
{
MethodName: "SyncJob",
Handler: _BlockBuilderService_SyncJob_Handler,
Handler: _SchedulerService_SyncJob_Handler,
},
},
Streams: []grpc.StreamDesc{},

@ -4,8 +4,8 @@ package blockbuilder.types;
option go_package = "github.com/grafana/loki/pkg/blockbuilder/types/proto";
// BlockBuilderService defines the gRPC service for block builder communication
service BlockBuilderService {
// SchedulerService defines the gRPC service for calls originating from the blockbuilder (to the scheduler)
service SchedulerService {
// GetJob requests a new job from the scheduler
rpc GetJob(GetJobRequest) returns (GetJobResponse) {}
// CompleteJob notifies the scheduler that a job has been completed

@ -0,0 +1,54 @@
package types
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
)
// schedulerServer implements proto.SchedulerServiceServer by delegating to a SchedulerHandler
type schedulerServer struct {
handler SchedulerHandler
}
// NewSchedulerServer creates a new gRPC server that delegates to the provided handler
func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer {
return &schedulerServer{handler: handler}
}
// GetJob implements proto.SchedulerServiceServer
func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
job, ok, err := s.handler.HandleGetJob(ctx, req.BuilderId)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !ok {
return &proto.GetJobResponse{
Job: nil,
Ok: false,
}, nil
}
return &proto.GetJobResponse{
Job: jobToProto(job),
Ok: true,
}, nil
}
// CompleteJob implements proto.SchedulerServiceServer
func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
if err := s.handler.HandleCompleteJob(ctx, req.BuilderId, protoToJob(req.Job), req.Success); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &proto.CompleteJobResponse{}, nil
}
// SyncJob implements proto.SchedulerServiceServer
func (s *schedulerServer) SyncJob(ctx context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
if err := s.handler.HandleSyncJob(ctx, req.BuilderId, protoToJob(req.Job)); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &proto.SyncJobResponse{}, nil
}

@ -5,8 +5,8 @@ import (
)
var (
_ Transport = unimplementedTransport{}
_ Transport = &MemoryTransport{}
_ BuilderTransport = unimplementedTransport{}
_ BuilderTransport = &MemoryTransport{}
)
// unimplementedTransport provides default implementations that panic
@ -26,11 +26,11 @@ func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *SyncJobRequest
// MemoryTransport implements Transport interface for in-memory communication
type MemoryTransport struct {
scheduler Scheduler
scheduler SchedulerHandler
}
// NewMemoryTransport creates a new in-memory transport instance
func NewMemoryTransport(scheduler Scheduler) *MemoryTransport {
func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport {
return &MemoryTransport{
scheduler: scheduler,
}
@ -48,7 +48,7 @@ func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequ
}
func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job)
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job, req.Success)
}
func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error {

@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder"
blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler"
blocktypes "github.com/grafana/loki/v3/pkg/blockbuilder/types"
blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
@ -1862,8 +1863,19 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
if err != nil {
return nil, fmt.Errorf("creating kafka offset manager: %w", err)
}
s := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetManager, logger, prometheus.DefaultRegisterer)
blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s)
s := blockscheduler.NewScheduler(
t.Cfg.BlockScheduler,
blockscheduler.NewJobQueue(),
offsetManager,
logger,
prometheus.DefaultRegisterer,
)
blockprotos.RegisterSchedulerServiceServer(
t.Server.GRPC,
blocktypes.NewSchedulerServer(s),
)
return s, nil
}

Loading…
Cancel
Save