|
|
|
@ -4,10 +4,12 @@ import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"net" |
|
|
|
|
"sync" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
"github.com/grafana/dskit/backoff" |
|
|
|
|
"github.com/grafana/dskit/flagext" |
|
|
|
|
"github.com/grafana/dskit/services" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
@ -26,6 +28,7 @@ import ( |
|
|
|
|
|
|
|
|
|
func Test_BuilderLoop(t *testing.T) { |
|
|
|
|
logger := log.NewNopLogger() |
|
|
|
|
//logger := log.NewLogfmtLogger(os.Stdout)
|
|
|
|
|
|
|
|
|
|
schemaCfg := config.SchemaConfig{ |
|
|
|
|
Configs: []config.PeriodConfig{ |
|
|
|
@ -69,9 +72,17 @@ func Test_BuilderLoop(t *testing.T) { |
|
|
|
|
server, err := newFakePlannerServer(tasks) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Start the server so the builder can connect and receive tasks.
|
|
|
|
|
server.Start() |
|
|
|
|
|
|
|
|
|
limits := fakeLimits{} |
|
|
|
|
cfg := Config{ |
|
|
|
|
PlannerAddress: server.Addr(), |
|
|
|
|
BackoffConfig: backoff.Config{ |
|
|
|
|
MinBackoff: 1 * time.Second, |
|
|
|
|
MaxBackoff: 10 * time.Second, |
|
|
|
|
MaxRetries: 5, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
flagext.DefaultValues(&cfg.GrpcConfig) |
|
|
|
|
|
|
|
|
@ -87,10 +98,28 @@ func Test_BuilderLoop(t *testing.T) { |
|
|
|
|
err = services.StartAndAwaitRunning(context.Background(), builder) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Wait for at least one task to be processed.
|
|
|
|
|
require.Eventually(t, func() bool { |
|
|
|
|
return int(server.completedTasks.Load()) == len(tasks) |
|
|
|
|
return server.CompletedTasks() > 0 |
|
|
|
|
}, 5*time.Second, 100*time.Millisecond) |
|
|
|
|
|
|
|
|
|
// Right after stop it so connection is broken, and builder will retry.
|
|
|
|
|
server.Stop() |
|
|
|
|
|
|
|
|
|
// While the server is stopped, the builder should keep retrying to connect but no tasks should be processed.
|
|
|
|
|
// Note this is just a way to sleep while making sure no tasks are processed.
|
|
|
|
|
tasksProcessedSoFar := server.CompletedTasks() |
|
|
|
|
require.Never(t, func() bool { |
|
|
|
|
return server.CompletedTasks() > tasksProcessedSoFar |
|
|
|
|
}, 5*time.Second, 500*time.Millisecond) |
|
|
|
|
|
|
|
|
|
// Now we start the server so the builder can connect and receive tasks.
|
|
|
|
|
server.Start() |
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool { |
|
|
|
|
return server.CompletedTasks() >= len(tasks) |
|
|
|
|
}, 30*time.Second, 500*time.Millisecond) |
|
|
|
|
|
|
|
|
|
err = services.StopAndAwaitTerminated(context.Background(), builder) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
@ -102,41 +131,62 @@ type fakePlannerServer struct { |
|
|
|
|
completedTasks atomic.Int64 |
|
|
|
|
shutdownCalled bool |
|
|
|
|
|
|
|
|
|
addr string |
|
|
|
|
listenAddr string |
|
|
|
|
grpcServer *grpc.Server |
|
|
|
|
wg sync.WaitGroup |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) { |
|
|
|
|
lis, err := net.Listen("tcp", "localhost:0") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
server := &fakePlannerServer{ |
|
|
|
|
tasks: tasks, |
|
|
|
|
addr: lis.Addr().String(), |
|
|
|
|
grpcServer: grpc.NewServer(), |
|
|
|
|
tasks: tasks, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protos.RegisterPlannerForBuilderServer(server.grpcServer, server) |
|
|
|
|
go func() { |
|
|
|
|
if err := server.grpcServer.Serve(lis); err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
return server, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) Addr() string { |
|
|
|
|
return f.addr |
|
|
|
|
if f.listenAddr == "" { |
|
|
|
|
panic("server not started") |
|
|
|
|
} |
|
|
|
|
return f.listenAddr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) Stop() { |
|
|
|
|
f.grpcServer.Stop() |
|
|
|
|
if f.grpcServer != nil { |
|
|
|
|
f.grpcServer.Stop() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
f.wg.Wait() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) Start() { |
|
|
|
|
f.Stop() |
|
|
|
|
|
|
|
|
|
lisAddr := "localhost:0" |
|
|
|
|
if f.listenAddr != "" { |
|
|
|
|
// Reuse the same address if the server was stopped and started again.
|
|
|
|
|
lisAddr = f.listenAddr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
lis, err := net.Listen("tcp", lisAddr) |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
f.listenAddr = lis.Addr().String() |
|
|
|
|
|
|
|
|
|
f.grpcServer = grpc.NewServer() |
|
|
|
|
protos.RegisterPlannerForBuilderServer(f.grpcServer, f) |
|
|
|
|
go func() { |
|
|
|
|
if err := f.grpcServer.Serve(lis); err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error { |
|
|
|
|
f.wg.Add(1) |
|
|
|
|
defer f.wg.Done() |
|
|
|
|
|
|
|
|
|
// Receive Ready
|
|
|
|
|
if _, err := srv.Recv(); err != nil { |
|
|
|
|
return fmt.Errorf("failed to receive ready: %w", err) |
|
|
|
@ -149,7 +199,8 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop |
|
|
|
|
if _, err := srv.Recv(); err != nil { |
|
|
|
|
return fmt.Errorf("failed to receive task response: %w", err) |
|
|
|
|
} |
|
|
|
|
f.completedTasks.Add(1) |
|
|
|
|
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
|
|
|
|
|
f.completedTasks.Inc() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// No more tasks. Wait until shutdown.
|
|
|
|
@ -157,6 +208,10 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) CompletedTasks() int { |
|
|
|
|
return int(f.completedTasks.Load()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) { |
|
|
|
|
f.shutdownCalled = true |
|
|
|
|
return &protos.NotifyBuilderShutdownResponse{}, nil |
|
|
|
|