Fix flaky test (#5307)

- Ensure the registry is stopped before removing directories
- Fix possible panic when stopping registry
- Fix race conditions in registry state
pull/5868/head
Travis Patterson 3 years ago committed by GitHub
parent 4e421206f0
commit ab5d490f90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      pkg/ruler/registry_test.go
  2. 12
      pkg/ruler/storage/instance/instance.go
  3. 6
      pkg/ruler/storage/instance/manager.go

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net/url"
"os"
"strings"
"testing"
"time"
@ -85,7 +84,9 @@ func newFakeLimits() fakeLimits {
}
}
func setupRegistry(t *testing.T, dir string) *walRegistry {
func setupRegistry(t *testing.T) *walRegistry {
// TempDir adds RemoveAll to c.Cleanup
walDir := t.TempDir()
u, _ := url.Parse("http://remote-write")
cfg := Config{
@ -118,7 +119,7 @@ func setupRegistry(t *testing.T, dir string) *walRegistry {
ConfigRefreshPeriod: 5 * time.Second,
},
WAL: instance.Config{
Dir: dir,
Dir: walDir,
},
}
@ -128,12 +129,13 @@ func setupRegistry(t *testing.T, dir string) *walRegistry {
reg := newWALRegistry(log.NewNopLogger(), nil, cfg, overrides)
require.NoError(t, err)
//stops the registry before the directory is cleaned up
t.Cleanup(reg.stop)
return reg.(*walRegistry)
}
func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(enabledRWTenant)
require.NoError(t, err)
@ -145,8 +147,7 @@ func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
}
func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
// this tenant has no overrides, so will get defaults
tenantCfg, err := reg.getTenantConfig("unknown")
@ -159,8 +160,7 @@ func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
}
func TestTenantRemoteWriteConfigDisabled(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(disabledRWTenant)
require.NoError(t, err)
@ -170,9 +170,7 @@ func TestTenantRemoteWriteConfigDisabled(t *testing.T) {
}
func TestTenantRemoteWriteHTTPConfigMaintained(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(enabledRWTenant)
require.NoError(t, err)
@ -183,8 +181,7 @@ func TestTenantRemoteWriteHTTPConfigMaintained(t *testing.T) {
}
func TestTenantRemoteWriteHeaderOverride(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(additionalHeadersRWTenant)
require.NoError(t, err)
@ -205,8 +202,7 @@ func TestTenantRemoteWriteHeaderOverride(t *testing.T) {
}
func TestTenantRemoteWriteHeadersReset(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(noHeadersRWTenant)
require.NoError(t, err)
@ -219,8 +215,7 @@ func TestTenantRemoteWriteHeadersReset(t *testing.T) {
}
func TestTenantRemoteWriteHeadersNoOverride(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(enabledRWTenant)
require.NoError(t, err)
@ -233,8 +228,7 @@ func TestTenantRemoteWriteHeadersNoOverride(t *testing.T) {
}
func TestRelabelConfigOverrides(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(customRelabelsTenant)
require.NoError(t, err)
@ -244,8 +238,7 @@ func TestRelabelConfigOverrides(t *testing.T) {
}
func TestRelabelConfigOverridesNilWriteRelabels(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(nilRelabelsTenant)
require.NoError(t, err)
@ -255,8 +248,7 @@ func TestRelabelConfigOverridesNilWriteRelabels(t *testing.T) {
}
func TestRelabelConfigOverridesEmptySliceWriteRelabels(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
tenantCfg, err := reg.getTenantConfig(emptySliceRelabelsTenant)
require.NoError(t, err)
@ -266,8 +258,7 @@ func TestRelabelConfigOverridesEmptySliceWriteRelabels(t *testing.T) {
}
func TestRelabelConfigOverridesWithErrors(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
_, err := reg.getTenantConfig(badRelabelsTenant)
@ -300,8 +291,7 @@ func TestWALRegistryCreation(t *testing.T) {
}
func TestStorageSetup(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
// once the registry is setup and we configure the tenant storage, we should be able
// to acquire an appender for the WAL storage
@ -316,8 +306,7 @@ func TestStorageSetup(t *testing.T) {
}
func TestStorageSetupWithRemoteWriteDisabled(t *testing.T) {
walDir := t.TempDir()
reg := setupRegistry(t, walDir)
reg := setupRegistry(t)
// once the registry is setup and we configure the tenant storage, we should be able
// to acquire an appender for the WAL storage

@ -207,6 +207,9 @@ func newInstance(cfg Config, reg prometheus.Registerer, logger log.Logger, newWa
}
func (i *Instance) Storage() storage.Storage {
i.mut.Lock()
defer i.mut.Unlock()
return i.storage
}
@ -284,12 +287,12 @@ func (n noopScrapeManager) Get() (*scrape.Manager, error) {
// components cannot be reused after they are stopped so we need to recreate them
// each run.
func (i *Instance) initialize(_ context.Context, reg prometheus.Registerer, cfg *Config) error {
// explicitly set this in case this function is called multiple times
i.initialized = false
i.mut.Lock()
defer i.mut.Unlock()
// explicitly set this in case this function is called multiple times
i.initialized = false
var err error
i.wal, err = i.newWal(reg)
@ -373,6 +376,9 @@ func (i *Instance) Update(c Config) (err error) {
// Ready indicates if the instance is ready for processing.
func (i *Instance) Ready() bool {
i.mut.Lock()
defer i.mut.Unlock()
return i.initialized
}

@ -101,8 +101,10 @@ type managedProcess struct {
}
func (p managedProcess) Stop() {
if err := p.inst.Stop(); err != nil {
level.Error(util_log.Logger).Log("msg", "error while stopping instance", "user", p.inst.Tenant(), "err", err)
if p.inst.Ready() { // Only stop initialized instances to avoid panic
if err := p.inst.Stop(); err != nil {
level.Error(util_log.Logger).Log("msg", "error while stopping instance", "user", p.inst.Tenant(), "err", err)
}
}
p.cancel()

Loading…
Cancel
Save