fix: keep dual writer mode in sync with kv store (#106692)

pull/107020/head
Mustafa Sencer Özcan 6 months ago committed by GitHub
parent a4232eb1b8
commit f2bd367f61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 71
      pkg/apiserver/rest/dualwriter.go
  2. 64
      pkg/apiserver/rest/dualwriter_test.go

@ -123,17 +123,17 @@ func SetDualWritingMode(
errDualWriterSetCurrentMode := errors.New("failed to set current dual writing mode")
// Use entity name as key
m, ok, err := kvs.Get(ctx, cfg.Kind)
kvMode, ok, err := kvs.Get(ctx, cfg.Kind)
if err != nil {
return Mode0, errors.New("failed to fetch current dual writing mode")
}
currentMode, exists := toMode[m]
currentMode, exists := toMode[kvMode]
// If the mode does not exist in our mapping, we log an error.
if !exists && ok {
// Only log if "ok" because initially all instances will have mode unset for playlists.
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, m)
klog.Infof("invalid dual writing mode for %s mode: %v", cfg.Kind, kvMode)
}
// If the mode does not exist in our mapping, and we also didn't find an entry for this kind, fallback.
@ -145,51 +145,44 @@ func SetDualWritingMode(
}
}
// Handle transitions to the desired mode.
switch {
case cfg.Mode == Mode2 || cfg.Mode == Mode1:
// Directly set the mode for Mode1 and Mode2.
currentMode = cfg.Mode
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
isUpgradeToReadUnifiedMode := currentMode < Mode3 && cfg.Mode >= Mode3
if !isUpgradeToReadUnifiedMode {
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
case cfg.Mode >= Mode3 && currentMode < Mode3:
if cfg.SkipDataSync {
return currentMode, nil
}
return cfg.Mode, nil
}
// Transitioning to Mode3 or higher requires data synchronization.
cfgModeTmp := cfg.Mode
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
// once in the current active mode before we can upgrade.
cfg.Mode = currentMode
syncOk, err := runDataSyncer(ctx, cfg)
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
cfg.Mode = cfgModeTmp
if err != nil {
klog.Error("data syncer failed for mode:", m, "err", err)
return currentMode, nil
}
if !syncOk {
klog.Info("data syncer not ok for mode:", m)
return currentMode, nil
}
// If sync is successful, update the mode to the desired one.
// If SkipDataSync is enabled, we can set the mode directly without running the syncer.
if cfg.SkipDataSync {
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return cfg.Mode, nil
case cfg.Mode >= Mode3 && currentMode >= Mode3:
// If already in Mode3 or higher, simply update to the desired mode.
currentMode = cfg.Mode
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(currentMode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
default:
// Handle any unexpected cases (should not normally happen).
}
// Transitioning to Mode3 or higher from Mode0, Mode1, or Mode2.
// We need to run the syncer in the current mode before we can upgrade to Mode3 or higher.
cfgModeTmp := cfg.Mode
// Before running the sync, set the syncer config to the current mode, as we have to run the syncer
// once in the current active mode before we can upgrade.
cfg.Mode = currentMode
syncOk, err := runDataSyncer(ctx, cfg)
// Once we are done with running the syncer, we can change the mode back on the config to the desired one.
cfg.Mode = cfgModeTmp
if err != nil {
klog.Error("data syncer failed for mode:", kvMode, "err", err)
return currentMode, nil
}
if !syncOk {
klog.Info("data syncer not ok for mode:", kvMode)
return currentMode, nil
}
// If sync is successful, update the mode to the desired one.
if err := kvs.Set(ctx, cfg.Kind, fmt.Sprint(cfg.Mode)); err != nil {
return Mode0, errDualWriterSetCurrentMode
}
return currentMode, nil
return cfg.Mode, nil
}
var defaultConverter = runtime.UnstructuredConverter(runtime.DefaultUnstructuredConverter)

@ -20,54 +20,62 @@ func TestSetDualWritingMode(t *testing.T) {
kvStore *fakeNamespacedKV
desiredMode DualWriterMode
expectedMode DualWriterMode
expectedKVMode string
skipDataSync bool
serverLockError error
}
tests :=
[]testCase{
{
name: "should return a mode 2 dual writer when mode 2 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode2,
expectedMode: Mode2,
name: "should return a mode 2 dual writer when mode 2 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode2,
expectedMode: Mode2,
expectedKVMode: "2",
},
{
name: "should return a mode 1 dual writer when mode 1 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode1,
expectedMode: Mode1,
name: "should return a mode 1 dual writer when mode 1 is set as the desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode1,
expectedMode: Mode1,
expectedKVMode: "1",
},
{
name: "should return mode 3 as desired mode when current mode is > 3",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "5"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
name: "should return mode 3 as desired mode when current mode is > 3",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "5"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
},
{
name: "should return mode 3 as desired mode when current mode is 2",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
name: "should return mode 3 as desired mode when current mode is 2",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
},
{
name: "should default to mode 0 if there is no desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{}, namespace: "storage.dualwriting"},
desiredMode: Mode0,
expectedMode: Mode0,
name: "should default to mode 0 if there is no desired mode",
kvStore: &fakeNamespacedKV{data: map[string]string{}, namespace: "storage.dualwriting"},
desiredMode: Mode0,
expectedMode: Mode0,
expectedKVMode: "",
},
{
name: "should keep mode2 when trying to go from mode2 to mode3 and the server lock service returns an error",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode2,
expectedKVMode: "2",
serverLockError: fmt.Errorf("lock already exists"),
},
{
name: "should keep mode2 when trying to go from mode2 to mode3 and migration is disabled",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode2,
skipDataSync: true,
name: "should keep mode2 when trying to go from mode2 to mode3 and migration is disabled",
kvStore: &fakeNamespacedKV{data: map[string]string{"playlist.grafana.app/playlists": "2"}, namespace: "storage.dualwriting"},
desiredMode: Mode3,
expectedMode: Mode3,
expectedKVMode: "3",
skipDataSync: true,
},
}
@ -104,6 +112,10 @@ func TestSetDualWritingMode(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, tt.expectedMode, dwMode)
kvMode, _, err := tt.kvStore.Get(context.Background(), "playlist.grafana.app/playlists")
require.NoError(t, err)
require.Equal(t, tt.expectedKVMode, kvMode, "expected mode for playlist.grafana.app/playlists")
}
}
@ -162,7 +174,7 @@ func (f *fakeNamespacedKV) Get(ctx context.Context, key string) (string, bool, e
}
func (f *fakeNamespacedKV) Set(ctx context.Context, key, value string) error {
f.data[f.namespace+key] = value
f.data[key] = value
return nil
}

Loading…
Cancel
Save