|
|
|
@ -14,12 +14,6 @@ import ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func TestProcessManager_Start(t *testing.T) { |
|
|
|
|
t.Run("Plugin not found in registry", func(t *testing.T) { |
|
|
|
|
m := NewManager(fakes.NewFakePluginRegistry()) |
|
|
|
|
err := m.Start(context.Background(), "non-existing-datasource") |
|
|
|
|
require.ErrorIs(t, err, backendplugin.ErrPluginNotRegistered) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
t.Run("Plugin state determines process start", func(t *testing.T) { |
|
|
|
|
tcs := []struct { |
|
|
|
|
name string |
|
|
|
@ -58,21 +52,16 @@ func TestProcessManager_Start(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
for _, tc := range tcs { |
|
|
|
|
t.Run(tc.name, func(t *testing.T) { |
|
|
|
|
bp := newFakeBackendPlugin(tc.managed) |
|
|
|
|
bp := fakes.NewFakeBackendPlugin(tc.managed) |
|
|
|
|
p := createPlugin(t, bp, func(plugin *plugins.Plugin) { |
|
|
|
|
plugin.Backend = tc.backend |
|
|
|
|
plugin.SignatureError = tc.signatureError |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
m := NewManager(&fakes.FakePluginRegistry{ |
|
|
|
|
Store: map[string]*plugins.Plugin{ |
|
|
|
|
p.ID: p, |
|
|
|
|
}}, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
err := m.Start(context.Background(), p.ID) |
|
|
|
|
m := &Service{} |
|
|
|
|
err := m.Start(context.Background(), p) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Equal(t, tc.expectedStartCount, bp.startCount) |
|
|
|
|
require.Equal(t, tc.expectedStartCount, bp.StartCount) |
|
|
|
|
|
|
|
|
|
if tc.expectedStartCount > 0 { |
|
|
|
|
require.True(t, !p.Exited()) |
|
|
|
@ -85,67 +74,42 @@ func TestProcessManager_Start(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestProcessManager_Stop(t *testing.T) { |
|
|
|
|
t.Run("Plugin not found in registry", func(t *testing.T) { |
|
|
|
|
m := NewManager(fakes.NewFakePluginRegistry()) |
|
|
|
|
err := m.Stop(context.Background(), "non-existing-datasource") |
|
|
|
|
require.ErrorIs(t, err, backendplugin.ErrPluginNotRegistered) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
t.Run("Can stop a running plugin", func(t *testing.T) { |
|
|
|
|
pluginID := "test-datasource" |
|
|
|
|
|
|
|
|
|
bp := newFakeBackendPlugin(true) |
|
|
|
|
bp := fakes.NewFakeBackendPlugin(true) |
|
|
|
|
p := createPlugin(t, bp, func(plugin *plugins.Plugin) { |
|
|
|
|
plugin.ID = pluginID |
|
|
|
|
plugin.Backend = true |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
m := NewManager(&fakes.FakePluginRegistry{ |
|
|
|
|
Store: map[string]*plugins.Plugin{ |
|
|
|
|
pluginID: p, |
|
|
|
|
}}, |
|
|
|
|
) |
|
|
|
|
err := m.Stop(context.Background(), pluginID) |
|
|
|
|
m := &Service{} |
|
|
|
|
err := m.Stop(context.Background(), p) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
require.True(t, p.IsDecommissioned()) |
|
|
|
|
require.True(t, bp.decommissioned) |
|
|
|
|
require.True(t, p.Exited()) |
|
|
|
|
require.Equal(t, 1, bp.stopCount) |
|
|
|
|
require.Equal(t, 1, bp.StopCount) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestProcessManager_ManagedBackendPluginLifecycle(t *testing.T) { |
|
|
|
|
bp := newFakeBackendPlugin(true) |
|
|
|
|
bp := fakes.NewFakeBackendPlugin(true) |
|
|
|
|
p := createPlugin(t, bp, func(plugin *plugins.Plugin) { |
|
|
|
|
plugin.Backend = true |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
m := NewManager(&fakes.FakePluginRegistry{ |
|
|
|
|
Store: map[string]*plugins.Plugin{ |
|
|
|
|
p.ID: p, |
|
|
|
|
}}, |
|
|
|
|
) |
|
|
|
|
m := &Service{} |
|
|
|
|
|
|
|
|
|
err := m.Start(context.Background(), p.ID) |
|
|
|
|
err := m.Start(context.Background(), p) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Equal(t, 1, bp.startCount) |
|
|
|
|
require.Equal(t, 1, bp.StartCount) |
|
|
|
|
|
|
|
|
|
t.Run("When plugin process is killed, the process is restarted", func(t *testing.T) { |
|
|
|
|
pCtx := context.Background() |
|
|
|
|
cCtx, cancel := context.WithCancel(pCtx) |
|
|
|
|
var wgRun sync.WaitGroup |
|
|
|
|
wgRun.Add(1) |
|
|
|
|
var runErr error |
|
|
|
|
go func() { |
|
|
|
|
runErr = m.Run(cCtx) |
|
|
|
|
wgRun.Done() |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var wgKill sync.WaitGroup |
|
|
|
|
wgKill.Add(1) |
|
|
|
|
go func() { |
|
|
|
|
bp.kill() // manually kill process
|
|
|
|
|
bp.Kill() // manually kill process
|
|
|
|
|
for { |
|
|
|
|
if !bp.Exited() { |
|
|
|
|
break |
|
|
|
@ -155,85 +119,15 @@ func TestProcessManager_ManagedBackendPluginLifecycle(t *testing.T) { |
|
|
|
|
}() |
|
|
|
|
wgKill.Wait() |
|
|
|
|
require.True(t, !p.Exited()) |
|
|
|
|
require.Equal(t, 2, bp.startCount) |
|
|
|
|
require.Equal(t, 0, bp.stopCount) |
|
|
|
|
|
|
|
|
|
t.Run("When context is cancelled the plugin is stopped", func(t *testing.T) { |
|
|
|
|
cancel() |
|
|
|
|
wgRun.Wait() |
|
|
|
|
require.ErrorIs(t, runErr, context.Canceled) |
|
|
|
|
require.True(t, p.Exited()) |
|
|
|
|
require.Equal(t, 2, bp.startCount) |
|
|
|
|
require.Equal(t, 1, bp.stopCount) |
|
|
|
|
require.Equal(t, 2, bp.StartCount) |
|
|
|
|
require.Equal(t, 0, bp.StopCount) |
|
|
|
|
|
|
|
|
|
t.Cleanup(func() { |
|
|
|
|
require.NoError(t, m.Stop(context.Background(), p)) |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type fakeBackendPlugin struct { |
|
|
|
|
managed bool |
|
|
|
|
|
|
|
|
|
startCount int |
|
|
|
|
stopCount int |
|
|
|
|
decommissioned bool |
|
|
|
|
running bool |
|
|
|
|
|
|
|
|
|
mutex sync.RWMutex |
|
|
|
|
backendplugin.Plugin |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newFakeBackendPlugin(managed bool) *fakeBackendPlugin { |
|
|
|
|
return &fakeBackendPlugin{ |
|
|
|
|
managed: managed, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) Start(_ context.Context) error { |
|
|
|
|
p.mutex.Lock() |
|
|
|
|
defer p.mutex.Unlock() |
|
|
|
|
p.running = true |
|
|
|
|
p.startCount++ |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) Stop(_ context.Context) error { |
|
|
|
|
p.mutex.Lock() |
|
|
|
|
defer p.mutex.Unlock() |
|
|
|
|
p.running = false |
|
|
|
|
p.stopCount++ |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) Decommission() error { |
|
|
|
|
p.mutex.Lock() |
|
|
|
|
defer p.mutex.Unlock() |
|
|
|
|
p.decommissioned = true |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) IsDecommissioned() bool { |
|
|
|
|
p.mutex.RLock() |
|
|
|
|
defer p.mutex.RUnlock() |
|
|
|
|
return p.decommissioned |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) IsManaged() bool { |
|
|
|
|
p.mutex.RLock() |
|
|
|
|
defer p.mutex.RUnlock() |
|
|
|
|
return p.managed |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) Exited() bool { |
|
|
|
|
p.mutex.RLock() |
|
|
|
|
defer p.mutex.RUnlock() |
|
|
|
|
return !p.running |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *fakeBackendPlugin) kill() { |
|
|
|
|
p.mutex.Lock() |
|
|
|
|
defer p.mutex.Unlock() |
|
|
|
|
p.running = false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func createPlugin(t *testing.T, bp backendplugin.Plugin, cbs ...func(p *plugins.Plugin)) *plugins.Plugin { |
|
|
|
|
t.Helper() |
|
|
|
|
|
|
|
|
|