mirror of https://github.com/grafana/loki
feat: Block scheduler scaffolding (#15198)
parent
be4f17eefe
commit
a10140df3d
@ -0,0 +1,159 @@ |
||||
# Block Builder Architecture |
||||
|
||||
## Overview |
||||
|
||||
The Block Builder and Block Scheduler are separate components designed to build storage formats from ingested Kafka data. The Block Scheduler coordinates job distribution to multiple Block Builder instances, implementing a pull-based architecture that decouples read and write operations, allowing for independent scaling and simpler operational management. This document describes the architecture and interaction between components. |
||||
|
||||
## Package Structure |
||||
|
||||
The Block Builder system is organized into three main packages: |
||||
|
||||
### pkg/blockbuilder/types |
||||
- Contains shared type definitions and interfaces |
||||
- Defines core data structures like `Job` and `Offsets` |
||||
- Provides interface definitions for: |
||||
- `Worker`: Interface for processing jobs and reporting status |
||||
- `Scheduler`: Interface for job scheduling and worker management |
||||
- `Transport`: Interface for communication between components |
||||
|
||||
### pkg/blockbuilder/scheduler |
||||
- Implements the job queue and scheduling logic |
||||
- Manages job distribution to block builders |
||||
- Tracks job progress and ensures exactly-once processing |
||||
- Handles job state management and offset tracking |
||||
|
||||
### pkg/blockbuilder/builder |
||||
- Implements the block builder worker functionality |
||||
- Processes assigned jobs and builds storage formats |
||||
- Manages transport layer communication |
||||
- Handles data processing and object storage interactions |
||||
|
||||
## Component Diagram |
||||
|
||||
```mermaid |
||||
graph TB |
||||
subgraph Kafka |
||||
KP[Kafka Partitions] |
||||
end |
||||
|
||||
subgraph Block Scheduler |
||||
S[Scheduler] |
||||
Q[Job Queue] |
||||
PC[Partition Controller] |
||||
|
||||
subgraph Transport Layer |
||||
T[gRPC/Transport Interface] |
||||
end |
||||
end |
||||
|
||||
subgraph Block Builders |
||||
BB1[Block Builder 1] |
||||
BB2[Block Builder 2] |
||||
BB3[Block Builder N] |
||||
end |
||||
|
||||
subgraph Storage |
||||
OS[Object Storage] |
||||
end |
||||
|
||||
KP --> PC |
||||
PC --> S |
||||
S <--> Q |
||||
S <--> T |
||||
T <--> BB1 |
||||
T <--> BB2 |
||||
T <--> BB3 |
||||
BB1 --> OS |
||||
BB2 --> OS |
||||
BB3 --> OS |
||||
``` |
||||
|
||||
## Job Processing Sequence |
||||
|
||||
```mermaid |
||||
sequenceDiagram |
||||
participant PC as Partition Controller |
||||
participant S as Block Scheduler |
||||
participant Q as Queue |
||||
participant T as Transport |
||||
participant BB as Block Builder |
||||
participant OS as Object Storage |
||||
|
||||
loop Monitor Partitions |
||||
PC->>PC: Check for new offsets |
||||
PC->>S: Create Job (partition, offset range) |
||||
S->>Q: Enqueue Job |
||||
end |
||||
|
||||
BB->>T: Request Job |
||||
T->>S: Forward Request |
||||
S->>Q: Dequeue Job |
||||
Q-->>S: Return Job (or empty) |
||||
alt Has Job |
||||
S->>T: Send Job |
||||
T->>BB: Forward Job |
||||
BB->>OS: Process & Write Data |
||||
BB->>T: Report Success |
||||
T->>S: Forward Status |
||||
S->>PC: Commit Offset |
||||
else No Job |
||||
S->>T: Send No Job Available |
||||
T->>BB: Forward Response |
||||
end |
||||
``` |
||||
|
||||
## Core Components |
||||
|
||||
### Job and Offsets |
||||
- `Job`: Represents a unit of work for processing Kafka data |
||||
- Contains a partition ID and an offset range |
||||
- Immutable data structure that can be safely passed between components |
||||
- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process |
||||
- Used to track progress and ensure exactly-once processing |
||||
|
||||
### Block Scheduler |
||||
- Central component responsible for: |
||||
- Managing the job queue |
||||
- Coordinating Block Builder assignments |
||||
- Tracking job progress |
||||
- Implements a pull-based model where Block Builders request jobs |
||||
- Decoupled from specific transport mechanisms through the Transport interface |
||||
|
||||
### Block Builder |
||||
- Processes jobs assigned by the Block Scheduler |
||||
- Responsible for: |
||||
- Building storage formats from Kafka data |
||||
- Writing completed blocks to object storage |
||||
- Reporting job status back to scheduler |
||||
- Implements the Worker interface for job processing |
||||
|
||||
### Transport Layer |
||||
- Provides communication between Block Builders and Scheduler |
||||
- Abstracts transport mechanism (currently in-memory & gRPC) |
||||
- Defines message types for: |
||||
- Job requests |
||||
- Job completion notifications |
||||
- Job synchronization |
||||
|
||||
## Design Principles |
||||
|
||||
### Decoupled I/O |
||||
- Business logic is separated from I/O operations |
||||
- Transport interface allows for different communication mechanisms |
||||
- Enables easier testing through mock implementations |
||||
|
||||
### Stateless Design |
||||
- Block Builders are stateless workers |
||||
- All state is managed by the Scheduler |
||||
- Allows for easy scaling and failover |
||||
|
||||
### Pull-Based Architecture |
||||
- Block Builders pull jobs when ready |
||||
- Natural load balancing |
||||
- Prevents overloading of workers |
||||
|
||||
|
||||
### Interface-Driven Development |
||||
- Core components defined by interfaces |
||||
- Allows for multiple implementations |
||||
- Facilitates testing and modularity |
||||
@ -0,0 +1,16 @@ |
||||
package builder |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
// TestBuilder implements Worker interface for testing
|
||||
type TestBuilder struct { |
||||
*Worker |
||||
} |
||||
|
||||
func NewTestBuilder(builderID string, transport types.Transport) *TestBuilder { |
||||
return &TestBuilder{ |
||||
Worker: NewWorker(builderID, transport), |
||||
} |
||||
} |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"context" |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"context" |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"bytes" |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"context" |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"os" |
||||
@ -0,0 +1,58 @@ |
||||
package builder |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
var ( |
||||
_ types.Transport = unimplementedTransport{} |
||||
_ types.Transport = &MemoryTransport{} |
||||
) |
||||
|
||||
// unimplementedTransport provides default implementations that panic
|
||||
type unimplementedTransport struct{} |
||||
|
||||
func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
// MemoryTransport implements Transport interface for in-memory communication
|
||||
type MemoryTransport struct { |
||||
scheduler types.Scheduler |
||||
} |
||||
|
||||
// NewMemoryTransport creates a new in-memory transport instance
|
||||
func NewMemoryTransport(scheduler types.Scheduler) *MemoryTransport { |
||||
return &MemoryTransport{ |
||||
scheduler: scheduler, |
||||
} |
||||
} |
||||
|
||||
func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *types.GetJobRequest) (*types.GetJobResponse, error) { |
||||
job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &types.GetJobResponse{ |
||||
Job: job, |
||||
OK: ok, |
||||
}, nil |
||||
} |
||||
|
||||
func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *types.CompleteJobRequest) error { |
||||
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job) |
||||
} |
||||
|
||||
func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *types.SyncJobRequest) error { |
||||
return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job) |
||||
} |
||||
@ -1,4 +1,4 @@ |
||||
package blockbuilder |
||||
package builder |
||||
|
||||
import ( |
||||
"bytes" |
||||
@ -0,0 +1,66 @@ |
||||
package builder |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
var ( |
||||
_ types.Worker = unimplementedWorker{} |
||||
_ types.Worker = &Worker{} |
||||
) |
||||
|
||||
// unimplementedWorker provides default implementations for the Worker interface.
|
||||
type unimplementedWorker struct{} |
||||
|
||||
func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (u unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
// Worker is the implementation of the Worker interface.
|
||||
type Worker struct { |
||||
unimplementedWorker |
||||
transport types.Transport |
||||
builderID string |
||||
} |
||||
|
||||
// NewWorker creates a new Worker instance.
|
||||
func NewWorker(builderID string, transport types.Transport) *Worker { |
||||
return &Worker{ |
||||
transport: transport, |
||||
builderID: builderID, |
||||
} |
||||
} |
||||
|
||||
func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) { |
||||
resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{ |
||||
BuilderID: w.builderID, |
||||
}) |
||||
if err != nil { |
||||
return nil, false, err |
||||
} |
||||
return resp.Job, resp.OK, nil |
||||
} |
||||
|
||||
func (w *Worker) CompleteJob(ctx context.Context, job *types.Job) error { |
||||
return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{ |
||||
BuilderID: w.builderID, |
||||
Job: job, |
||||
}) |
||||
} |
||||
|
||||
func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error { |
||||
return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{ |
||||
BuilderID: w.builderID, |
||||
Job: job, |
||||
}) |
||||
} |
||||
@ -0,0 +1,106 @@ |
||||
package scheduler |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
// jobAssignment tracks a job and its assigned builder
|
||||
type jobAssignment struct { |
||||
job *types.Job |
||||
builderID string |
||||
} |
||||
|
||||
// JobQueue manages the queue of pending jobs and tracks their state.
|
||||
type JobQueue struct { |
||||
pending map[string]*types.Job // Jobs waiting to be processed, key is job ID
|
||||
inProgress map[string]*jobAssignment // job ID -> assignment info
|
||||
completed map[string]*types.Job // Completed jobs, key is job ID
|
||||
mu sync.RWMutex |
||||
} |
||||
|
||||
// NewJobQueue creates a new job queue instance
|
||||
func NewJobQueue() *JobQueue { |
||||
return &JobQueue{ |
||||
pending: make(map[string]*types.Job), |
||||
inProgress: make(map[string]*jobAssignment), |
||||
completed: make(map[string]*types.Job), |
||||
} |
||||
} |
||||
|
||||
// Enqueue adds a new job to the pending queue
|
||||
// This is a naive implementation, intended to be refactored
|
||||
func (q *JobQueue) Enqueue(job *types.Job) error { |
||||
q.mu.Lock() |
||||
defer q.mu.Unlock() |
||||
|
||||
if _, exists := q.pending[job.ID]; exists { |
||||
return fmt.Errorf("job %s already exists in pending queue", job.ID) |
||||
} |
||||
if _, exists := q.inProgress[job.ID]; exists { |
||||
return fmt.Errorf("job %s already exists in progress", job.ID) |
||||
} |
||||
if _, exists := q.completed[job.ID]; exists { |
||||
return fmt.Errorf("job %s already completed", job.ID) |
||||
} |
||||
|
||||
q.pending[job.ID] = job |
||||
return nil |
||||
} |
||||
|
||||
// Dequeue gets the next available job and assigns it to a builder
|
||||
func (q *JobQueue) Dequeue(builderID string) (*types.Job, bool, error) { |
||||
q.mu.Lock() |
||||
defer q.mu.Unlock() |
||||
|
||||
// Simple FIFO for now
|
||||
for id, job := range q.pending { |
||||
delete(q.pending, id) |
||||
q.inProgress[id] = &jobAssignment{ |
||||
job: job, |
||||
builderID: builderID, |
||||
} |
||||
return job, true, nil |
||||
} |
||||
|
||||
return nil, false, nil |
||||
} |
||||
|
||||
// MarkComplete moves a job from in-progress to completed
|
||||
func (q *JobQueue) MarkComplete(jobID string, builderID string) error { |
||||
q.mu.Lock() |
||||
defer q.mu.Unlock() |
||||
|
||||
assignment, exists := q.inProgress[jobID] |
||||
if !exists { |
||||
return fmt.Errorf("job %s not found in progress", jobID) |
||||
} |
||||
|
||||
if assignment.builderID != builderID { |
||||
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID) |
||||
} |
||||
|
||||
delete(q.inProgress, jobID) |
||||
q.completed[jobID] = assignment.job |
||||
return nil |
||||
} |
||||
|
||||
// SyncJob updates the state of an in-progress job
|
||||
func (q *JobQueue) SyncJob(jobID string, builderID string, job *types.Job) error { |
||||
q.mu.Lock() |
||||
defer q.mu.Unlock() |
||||
|
||||
assignment, exists := q.inProgress[jobID] |
||||
if !exists { |
||||
return fmt.Errorf("job %s not found in progress", jobID) |
||||
} |
||||
|
||||
if assignment.builderID != builderID { |
||||
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID) |
||||
} |
||||
|
||||
assignment.job = job |
||||
return nil |
||||
} |
||||
@ -0,0 +1,56 @@ |
||||
package scheduler |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
var ( |
||||
_ types.Scheduler = unimplementedScheduler{} |
||||
_ types.Scheduler = &QueueScheduler{} |
||||
) |
||||
|
||||
// unimplementedScheduler provides default implementations that panic.
|
||||
type unimplementedScheduler struct{} |
||||
|
||||
func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { |
||||
panic("unimplemented") |
||||
} |
||||
|
||||
// QueueScheduler implements the Scheduler interface
|
||||
type QueueScheduler struct { |
||||
queue *JobQueue |
||||
} |
||||
|
||||
// NewScheduler creates a new scheduler instance
|
||||
func NewScheduler(queue *JobQueue) *QueueScheduler { |
||||
return &QueueScheduler{ |
||||
queue: queue, |
||||
} |
||||
} |
||||
|
||||
func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, false, ctx.Err() |
||||
default: |
||||
return s.queue.Dequeue(builderID) |
||||
} |
||||
} |
||||
|
||||
func (s *QueueScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { |
||||
return s.queue.MarkComplete(job.ID, builderID) |
||||
} |
||||
|
||||
func (s *QueueScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { |
||||
return s.queue.SyncJob(job.ID, builderID, job) |
||||
} |
||||
@ -0,0 +1,148 @@ |
||||
package scheduler |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/builder" |
||||
"github.com/grafana/loki/v3/pkg/blockbuilder/types" |
||||
) |
||||
|
||||
type testEnv struct { |
||||
queue *JobQueue |
||||
scheduler *QueueScheduler |
||||
transport *builder.MemoryTransport |
||||
builder *builder.Worker |
||||
} |
||||
|
||||
func newTestEnv(builderID string) *testEnv { |
||||
queue := NewJobQueue() |
||||
scheduler := NewScheduler(queue) |
||||
transport := builder.NewMemoryTransport(scheduler) |
||||
builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler)) |
||||
|
||||
return &testEnv{ |
||||
queue: queue, |
||||
scheduler: scheduler, |
||||
transport: transport, |
||||
builder: builder, |
||||
} |
||||
} |
||||
|
||||
func TestScheduleAndProcessJob(t *testing.T) { |
||||
env := newTestEnv("test-builder-1") |
||||
ctx := context.Background() |
||||
|
||||
// Create and enqueue a test job
|
||||
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) |
||||
err := env.queue.Enqueue(job) |
||||
if err != nil { |
||||
t.Fatalf("failed to enqueue job: %v", err) |
||||
} |
||||
|
||||
// Builder gets job
|
||||
receivedJob, ok, err := env.builder.GetJob(ctx) |
||||
if err != nil { |
||||
t.Fatalf("failed to get job: %v", err) |
||||
} |
||||
if !ok { |
||||
t.Fatal("expected to receive job") |
||||
} |
||||
if receivedJob.ID != job.ID { |
||||
t.Errorf("got job ID %s, want %s", receivedJob.ID, job.ID) |
||||
} |
||||
|
||||
// Builder completes job
|
||||
err = env.builder.CompleteJob(ctx, receivedJob) |
||||
if err != nil { |
||||
t.Fatalf("failed to complete job: %v", err) |
||||
} |
||||
|
||||
// Try to get another job (should be none available)
|
||||
_, ok, err = env.builder.GetJob(ctx) |
||||
if err != nil { |
||||
t.Fatalf("failed to get second job: %v", err) |
||||
} |
||||
if ok { |
||||
t.Error("got unexpected second job") |
||||
} |
||||
} |
||||
|
||||
func TestContextCancellation(t *testing.T) { |
||||
env := newTestEnv("test-builder-1") |
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) |
||||
defer cancel() |
||||
|
||||
// Try to get job after context timeout
|
||||
time.Sleep(20 * time.Millisecond) |
||||
_, _, err := env.builder.GetJob(ctx) |
||||
if err == nil { |
||||
t.Error("expected error from cancelled context") |
||||
} |
||||
} |
||||
|
||||
func TestMultipleBuilders(t *testing.T) { |
||||
// Create first environment
|
||||
env1 := newTestEnv("test-builder-1") |
||||
// Create second builder using same scheduler
|
||||
builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler)) |
||||
|
||||
ctx := context.Background() |
||||
|
||||
// Create test jobs
|
||||
job1 := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) |
||||
job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400}) |
||||
|
||||
// Enqueue jobs
|
||||
err := env1.queue.Enqueue(job1) |
||||
if err != nil { |
||||
t.Fatalf("failed to enqueue job1: %v", err) |
||||
} |
||||
err = env1.queue.Enqueue(job2) |
||||
if err != nil { |
||||
t.Fatalf("failed to enqueue job2: %v", err) |
||||
} |
||||
|
||||
// Builders get jobs
|
||||
receivedJob1, ok, err := env1.builder.GetJob(ctx) |
||||
if err != nil { |
||||
t.Fatalf("builder1 failed to get job: %v", err) |
||||
} |
||||
if !ok { |
||||
t.Fatal("builder1 expected to receive job") |
||||
} |
||||
|
||||
receivedJob2, ok, err := builder2.GetJob(ctx) |
||||
if err != nil { |
||||
t.Fatalf("builder2 failed to get job: %v", err) |
||||
} |
||||
if !ok { |
||||
t.Fatal("builder2 expected to receive job") |
||||
} |
||||
|
||||
// Verify different jobs were assigned
|
||||
if receivedJob1.ID == receivedJob2.ID { |
||||
t.Error("builders received same job") |
||||
} |
||||
|
||||
// Complete jobs
|
||||
err = env1.builder.CompleteJob(ctx, receivedJob1) |
||||
if err != nil { |
||||
t.Fatalf("builder1 failed to complete job: %v", err) |
||||
} |
||||
|
||||
err = builder2.CompleteJob(ctx, receivedJob2) |
||||
if err != nil { |
||||
t.Fatalf("builder2 failed to complete job: %v", err) |
||||
} |
||||
|
||||
// Try to get more jobs (should be none available)
|
||||
_, ok, err = env1.builder.GetJob(ctx) |
||||
if err != nil { |
||||
t.Fatalf("builder1 failed to get second job: %v", err) |
||||
} |
||||
if ok { |
||||
t.Error("builder1 got unexpected second job") |
||||
} |
||||
} |
||||
@ -0,0 +1,53 @@ |
||||
package types |
||||
|
||||
import "context" |
||||
|
||||
// Worker interface defines the methods for processing jobs and reporting status.
|
||||
type Worker interface { |
||||
// GetJob requests a new job from the scheduler
|
||||
GetJob(ctx context.Context) (*Job, bool, error) |
||||
// CompleteJob marks a job as finished
|
||||
CompleteJob(ctx context.Context, job *Job) error |
||||
// SyncJob informs the scheduler about an in-progress job
|
||||
SyncJob(ctx context.Context, job *Job) error |
||||
} |
||||
|
||||
// Scheduler interface defines the methods for scheduling jobs and managing worker pools.
|
||||
type Scheduler interface { |
||||
// HandleGetJob processes a job request from a block builder
|
||||
HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) |
||||
// HandleCompleteJob processes a job completion notification
|
||||
HandleCompleteJob(ctx context.Context, builderID string, job *Job) error |
||||
// HandleSyncJob processes a job sync request
|
||||
HandleSyncJob(ctx context.Context, builderID string, job *Job) error |
||||
} |
||||
|
||||
// Transport defines the interface for communication between block builders and scheduler
|
||||
type Transport interface { |
||||
// SendGetJobRequest sends a request to get a new job
|
||||
SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) |
||||
// SendCompleteJob sends a job completion notification
|
||||
SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error |
||||
// SendSyncJob sends a job sync request
|
||||
SendSyncJob(ctx context.Context, req *SyncJobRequest) error |
||||
} |
||||
|
||||
// Request/Response message types
|
||||
type GetJobRequest struct { |
||||
BuilderID string |
||||
} |
||||
|
||||
type GetJobResponse struct { |
||||
Job *Job |
||||
OK bool |
||||
} |
||||
|
||||
type CompleteJobRequest struct { |
||||
BuilderID string |
||||
Job *Job |
||||
} |
||||
|
||||
type SyncJobRequest struct { |
||||
BuilderID string |
||||
Job *Job |
||||
} |
||||
@ -0,0 +1,42 @@ |
||||
package types |
||||
|
||||
import "fmt" |
||||
|
||||
// Job represents a block building task.
|
||||
type Job struct { |
||||
ID string |
||||
Status JobStatus |
||||
// Partition and offset information
|
||||
Partition int |
||||
Offsets Offsets |
||||
} |
||||
|
||||
// JobStatus represents the current state of a job
|
||||
type JobStatus int |
||||
|
||||
const ( |
||||
JobStatusPending JobStatus = iota |
||||
JobStatusInProgress |
||||
JobStatusComplete |
||||
) |
||||
|
||||
// Offsets represents the range of offsets to process
|
||||
type Offsets struct { |
||||
Min int64 |
||||
Max int64 |
||||
} |
||||
|
||||
// NewJob creates a new job with the given partition and offsets
|
||||
func NewJob(partition int, offsets Offsets) *Job { |
||||
return &Job{ |
||||
ID: GenerateJobID(partition, offsets), |
||||
Status: JobStatusPending, |
||||
Partition: partition, |
||||
Offsets: offsets, |
||||
} |
||||
} |
||||
|
||||
// GenerateJobID creates a deterministic job ID from partition and offsets
|
||||
func GenerateJobID(partition int, offsets Offsets) string { |
||||
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max) |
||||
} |
||||
Loading…
Reference in new issue