diff --git a/pkg/apiserver/rest/dualwriter.go b/pkg/apiserver/rest/dualwriter.go index f9eccd9c60e..b4552dc7965 100644 --- a/pkg/apiserver/rest/dualwriter.go +++ b/pkg/apiserver/rest/dualwriter.go @@ -6,12 +6,15 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" + "time" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/klog/v2" ) @@ -102,7 +105,13 @@ const ( // TODO: make this function private as there should only be one public way of setting the dual writing mode // NewDualWriter returns a new DualWriter. -func NewDualWriter(mode DualWriterMode, legacy LegacyStorage, storage Storage, reg prometheus.Registerer, kind string) DualWriter { +func NewDualWriter( + mode DualWriterMode, + legacy LegacyStorage, + storage Storage, + reg prometheus.Registerer, + kind string, +) DualWriter { metrics := &dualWriterMetrics{} metrics.init(reg) switch mode { @@ -148,6 +157,10 @@ type NamespacedKVStore interface { Set(ctx context.Context, key, value string) error } +type ServerLockService interface { + LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error +} + func SetDualWritingMode( ctx context.Context, kvs NamespacedKVStore, @@ -156,6 +169,8 @@ func SetDualWritingMode( entity string, desiredMode DualWriterMode, reg prometheus.Registerer, + serverLockService ServerLockService, + requestInfo *request.RequestInfo, ) (DualWriterMode, error) { // Mode0 means no DualWriter if desiredMode == Mode0 { @@ -206,6 +221,7 @@ func SetDualWritingMode( return Mode0, errDualWriterSetCurrentMode } } + if (desiredMode == Mode1) && (currentMode == Mode2) { // This is where we go through the different gates to allow the instance to migrate from mode 2 to mode 1. // There are none between mode 1 and mode 2 @@ -217,6 +233,28 @@ func SetDualWritingMode( } } + if (desiredMode == Mode3) && (currentMode == Mode2) { + // This is where we go through the different gates to allow the instance to migrate from mode 2 to mode 3. + + // gate #1: ensure the data is 100% in sync + syncOk, err := runDataSyncer(ctx, currentMode, legacy, storage, entity, reg, serverLockService, requestInfo) + if err != nil { + klog.Info("data syncer failed for mode:", m) + return currentMode, err + } + if !syncOk { + klog.Info("data syncer not ok for mode:", m) + return currentMode, nil + } + + err = kvs.Set(ctx, entity, fmt.Sprint(desiredMode)) + if err != nil { + return currentMode, errDualWriterSetCurrentMode + } + + return desiredMode, nil + } + // #TODO add support for other combinations of desired and current modes return currentMode, nil @@ -260,3 +298,54 @@ func getName(o runtime.Object) string { } return accessor.GetName() } + +const dataSyncerInterval = 60 * time.Minute + +// StartPeriodicDataSyncer starts a background job that will execute the DataSyncer every 60 minutes +func StartPeriodicDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, + kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) { + klog.Info("Starting periodic data syncer for mode mode: ", mode) + + // run in background + go func() { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + timeWindow := 600 // 600 seconds (10 minutes) + jitterSeconds := r.Int63n(int64(timeWindow)) + klog.Info("data syncer is going to start at: ", time.Now().Add(time.Second*time.Duration(jitterSeconds))) + time.Sleep(time.Second * time.Duration(jitterSeconds)) + + // run it immediately + syncOK, err := runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) + klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err) + + ticker := time.NewTicker(dataSyncerInterval) + for { + select { + case <-ticker.C: + syncOK, err = runDataSyncer(ctx, mode, legacy, storage, kind, reg, serverLockService, requestInfo) + klog.Info("data syncer finished, syncOK: ", syncOK, ", error: ", err) + case <-ctx.Done(): + return + } + } + }() +} + +// runDataSyncer will ensure that data between legacy storage and unified storage are in sync. +// The sync implementation depends on the DualWriter mode +func runDataSyncer(ctx context.Context, mode DualWriterMode, legacy LegacyStorage, storage Storage, + kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { + // ensure that execution takes no longer than necessary + const timeout = dataSyncerInterval - time.Minute + ctx, cancelFn := context.WithTimeout(ctx, timeout) + defer cancelFn() + + // implementation depends on the current DualWriter mode + switch mode { + case Mode2: + return mode2DataSyncer(ctx, legacy, storage, kind, reg, serverLockService, requestInfo) + default: + klog.Info("data syncer not implemented for mode mode:", mode) + return false, nil + } +} diff --git a/pkg/apiserver/rest/dualwriter_mode2.go b/pkg/apiserver/rest/dualwriter_mode2.go index 86ab89e9cc0..7b057455445 100644 --- a/pkg/apiserver/rest/dualwriter_mode2.go +++ b/pkg/apiserver/rest/dualwriter_mode2.go @@ -3,17 +3,22 @@ package rest import ( "context" "errors" + "fmt" "time" + "github.com/prometheus/client_golang/prometheus" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/klog/v2" + "github.com/grafana/authlib/claims" + "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/utils" ) @@ -30,7 +35,9 @@ const mode2Str = "2" // NewDualWriterMode2 returns a new DualWriter in mode 2. // Mode 2 represents writing to LegacyStorage and Storage and reading from LegacyStorage. func newDualWriterMode2(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics, kind string) *DualWriterMode2 { - return &DualWriterMode2{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "kind", kind), dualWriterMetrics: dwm} + return &DualWriterMode2{ + Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode2").WithValues("mode", mode2Str, "kind", kind), dualWriterMetrics: dwm, + } } // Mode returns the mode of the dual writer. @@ -394,3 +401,213 @@ func enrichLegacyObject(originalObj, returnedObj runtime.Object) error { accessorReturned.SetUID(accessorOriginal.GetUID()) return nil } + +func getSyncRequester(orgId int64) *identity.StaticRequester { + return &identity.StaticRequester{ + Type: claims.TypeServiceAccount, // system:apiserver + UserID: 1, + OrgID: orgId, + Name: "admin", + Login: "admin", + OrgRole: identity.RoleAdmin, + IsGrafanaAdmin: true, + Permissions: map[int64]map[string][]string{ + orgId: { + "*": {"*"}, // all resources, all scopes + }, + }, + } +} + +type syncItem struct { + name string + objStorage runtime.Object + objLegacy runtime.Object +} + +func getList(ctx context.Context, obj rest.Lister, listOptions *metainternalversion.ListOptions) ([]runtime.Object, error) { + ll, err := obj.List(ctx, listOptions) + if err != nil { + return nil, err + } + + return meta.ExtractList(ll) +} + +func mode2DataSyncer(ctx context.Context, legacy LegacyStorage, storage Storage, kind string, reg prometheus.Registerer, serverLockService ServerLockService, requestInfo *request.RequestInfo) (bool, error) { + metrics := &dualWriterMetrics{} + metrics.init(reg) + + log := klog.NewKlogr().WithName("DualWriterMode2Syncer") + + everythingSynced := false + outOfSync := 0 + syncSuccess := 0 + syncErr := 0 + + maxInterval := dataSyncerInterval + 5*time.Minute + + var errSync error + const maxRecordsSync = 1000 + + // LockExecuteAndRelease ensures that just a single Grafana server acquires a lock at a time + // The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the + // database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long + // that is impossible for 2 processes to run at the same time. + err := serverLockService.LockExecuteAndRelease(ctx, "dualwriter mode 2 sync", maxInterval, func(context.Context) { + log.Info("starting dualwriter mode 2 sync") + startSync := time.Now() + + orgId := int64(1) + + ctx = klog.NewContext(ctx, log) + ctx = identity.WithRequester(ctx, getSyncRequester(orgId)) + ctx = request.WithNamespace(ctx, requestInfo.Namespace) + ctx = request.WithRequestInfo(ctx, requestInfo) + + storageList, err := getList(ctx, storage, &metainternalversion.ListOptions{ + Limit: maxRecordsSync, + }) + if err != nil { + log.Error(err, "unable to extract list from storage") + return + } + + if len(storageList) >= maxRecordsSync { + errSync = fmt.Errorf("unified storage has more than %d records. Aborting sync", maxRecordsSync) + log.Error(errSync, "Unified storage has more records to be synced than allowed") + return + } + + log.Info("got items from unified storage", "items", len(storageList)) + + legacyList, err := getList(ctx, legacy, &metainternalversion.ListOptions{}) + if err != nil { + log.Error(err, "unable to extract list from legacy storage") + return + } + log.Info("got items from legacy storage", "items", len(legacyList)) + + itemsByName := map[string]syncItem{} + for _, obj := range legacyList { + accessor, err := utils.MetaAccessor(obj) + if err != nil { + log.Error(err, "error retrieving accessor data for object from legacy storage") + continue + } + name := accessor.GetName() + + item, ok := itemsByName[name] + if !ok { + item = syncItem{} + } + item.name = name + item.objLegacy = obj + itemsByName[name] = item + } + + for _, obj := range storageList { + accessor, err := utils.MetaAccessor(obj) + if err != nil { + log.Error(err, "error retrieving accessor data for object from storage") + continue + } + name := accessor.GetName() + + item, ok := itemsByName[name] + if !ok { + item = syncItem{} + } + item.name = name + item.objStorage = obj + itemsByName[name] = item + } + log.Info("got list of items to be synced", "items", len(itemsByName)) + + for name, item := range itemsByName { + // upsert if: + // - existing in both legacy and storage, but objects are different, or + // - if it's missing from storage + if item.objLegacy != nil && + ((item.objStorage != nil && !Compare(item.objLegacy, item.objStorage)) || (item.objStorage == nil)) { + outOfSync++ + + accessor, err := utils.MetaAccessor(item.objLegacy) + if err != nil { + log.Error(err, "error retrieving accessor data for object from storage") + continue + } + + if item.objStorage != nil { + accessorStorage, err := utils.MetaAccessor(item.objStorage) + if err != nil { + log.Error(err, "error retrieving accessor data for object from storage") + continue + } + accessor.SetResourceVersion(accessorStorage.GetResourceVersion()) + accessor.SetUID(accessorStorage.GetUID()) + + log.Info("updating item on unified storage", "name", name) + } else { + accessor.SetResourceVersion("") + accessor.SetUID("") + + log.Info("inserting item on unified storage", "name", name) + } + + objInfo := rest.DefaultUpdatedObjectInfo(item.objLegacy, []rest.TransformFunc{}...) + res, _, err := storage.Update(ctx, + name, + objInfo, + func(ctx context.Context, obj runtime.Object) error { return nil }, + func(ctx context.Context, obj, old runtime.Object) error { return nil }, + true, // force creation + &metav1.UpdateOptions{}, + ) + if err != nil { + log.WithValues("object", res).Error(err, "could not update in storage") + syncErr++ + } else { + syncSuccess++ + } + } + + // delete if object does not exists on legacy but exists on storage + if item.objLegacy == nil && item.objStorage != nil { + outOfSync++ + + ctx = request.WithRequestInfo(ctx, &request.RequestInfo{ + APIGroup: requestInfo.APIGroup, + Resource: requestInfo.Resource, + Name: name, + Namespace: requestInfo.Namespace, + }) + + log.Info("deleting item from unified storage", "name", name) + + deletedS, _, err := storage.Delete(ctx, name, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + log.WithValues("objectList", deletedS).Error(err, "could not delete from storage") + } + syncErr++ + } else { + syncSuccess++ + } + } + } + + everythingSynced = outOfSync == syncSuccess + + metrics.recordDataSyncerOutcome(mode2Str, kind, everythingSynced) + metrics.recordDataSyncerDuration(err != nil, mode2Str, kind, startSync) + + log.Info("finished syncing items", "items", len(itemsByName), "updated", syncSuccess, "failed", syncErr, "outcome", everythingSynced) + }) + + if errSync != nil { + err = errSync + } + + return everythingSynced, err +} diff --git a/pkg/apiserver/rest/dualwriter_mode2_test.go b/pkg/apiserver/rest/dualwriter_mode2_test.go index fd7e168b84a..809e10fe645 100644 --- a/pkg/apiserver/rest/dualwriter_mode2_test.go +++ b/pkg/apiserver/rest/dualwriter_mode2_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -15,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/endpoints/request" ) var createFn = func(context.Context, runtime.Object) error { return nil } @@ -607,3 +609,197 @@ func TestEnrichReturnedObject(t *testing.T) { }) } } + +var legacyObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var legacyObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var legacyObj2WithHostname = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{Hostname: "hostname"}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var storageObj1 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo1", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj2 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo2", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj3 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo3", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} +var storageObj4 = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo4", ResourceVersion: "1", CreationTimestamp: metav1.Time{}}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: time.Now()}}} + +var legacyListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2, + *legacyObj3, + }} + +var legacyListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2, + *legacyObj3, + *legacyObj4, + }} + +var legacyListWith3itemsObj2IsDifferent = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *legacyObj1, + *legacyObj2WithHostname, + *legacyObj3, + }} + +var storageListWith3items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj2, + *storageObj3, + }} + +var storageListWith4items = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj2, + *storageObj3, + *storageObj4, + }} + +var storageListWith3itemsMissingFoo2 = &example.PodList{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ListMeta: metav1.ListMeta{}, + Items: []example.Pod{ + *storageObj1, + *storageObj3, + *storageObj4, + }} + +func TestMode2_DataSyncer(t *testing.T) { + type testCase struct { + setupLegacyFn func(m *mock.Mock) + setupStorageFn func(m *mock.Mock) + name string + expectedOutcome bool + wantErr bool + } + tests := + []testCase{ + { + name: "both stores are in sync", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + }, + expectedOutcome: true, + }, + { + name: "both stores are in sync - fail to list from legacy", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, errors.New("error")) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + }, + expectedOutcome: false, + }, + { + name: "both stores are in sync - fail to list from storage", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage is missing 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage needs to be update (foo2 is different)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3itemsObj2IsDifferent, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage is missing 1 entry (foo4) - fail to upsert", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith4items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3items, nil) + m.On("Update", mock.Anything, "foo4", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage has an extra 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + { + name: "storage has an extra 1 entry (foo4) - fail to delete", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith4items, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, errors.New("error")) + }, + expectedOutcome: false, + }, + { + name: "storage is missing 1 entry (foo3) and has an extra 1 entry (foo4)", + setupLegacyFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(legacyListWith3items, nil) + }, + setupStorageFn: func(m *mock.Mock) { + m.On("List", mock.Anything, mock.Anything).Return(storageListWith3itemsMissingFoo2, nil) + m.On("Update", mock.Anything, "foo2", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil) + m.On("Delete", mock.Anything, "foo4", mock.Anything, mock.Anything).Return(exampleObj, false, nil) + }, + expectedOutcome: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := (LegacyStorage)(nil) + s := (Storage)(nil) + lm := &mock.Mock{} + um := &mock.Mock{} + + ls := legacyStoreMock{lm, l} + us := storageMock{um, s} + + if tt.setupLegacyFn != nil { + tt.setupLegacyFn(lm) + } + if tt.setupStorageFn != nil { + tt.setupStorageFn(um) + } + + outcome, err := mode2DataSyncer(context.Background(), ls, us, "test.kind", p, &fakeServerLock{}, &request.RequestInfo{}) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.expectedOutcome, outcome) + }) + } +} diff --git a/pkg/apiserver/rest/dualwriter_test.go b/pkg/apiserver/rest/dualwriter_test.go index a4a954bf744..4758dc6e3a3 100644 --- a/pkg/apiserver/rest/dualwriter_test.go +++ b/pkg/apiserver/rest/dualwriter_test.go @@ -6,12 +6,12 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/endpoints/request" ) func TestSetDualWritingMode(t *testing.T) { @@ -43,13 +43,15 @@ func TestSetDualWritingMode(t *testing.T) { s := (Storage)(nil) m := &mock.Mock{} + m.On("List", mock.Anything, mock.Anything).Return(exampleList, nil) + m.On("List", mock.Anything, mock.Anything).Return(anotherList, nil) + ls := legacyStoreMock{m, l} us := storageMock{m, s} kvStore := &fakeNamespacedKV{data: make(map[string]string), namespace: "storage.dualwriting." + tt.stackID} - p := prometheus.NewRegistry() - dwMode, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p) + dwMode, err := SetDualWritingMode(context.Background(), kvStore, ls, us, "playlist.grafana.app/v0alpha1", tt.desiredMode, p, &fakeServerLock{}, &request.RequestInfo{}) assert.NoError(t, err) assert.Equal(t, tt.expectedMode, dwMode) @@ -112,3 +114,12 @@ func (f *fakeNamespacedKV) Set(ctx context.Context, key, value string) error { f.data[f.namespace+key] = value return nil } + +// Never lock in tests +type fakeServerLock struct { +} + +func (f *fakeServerLock) LockExecuteAndRelease(ctx context.Context, actionName string, duration time.Duration, fn func(ctx context.Context)) error { + fn(ctx) + return nil +} diff --git a/pkg/apiserver/rest/metrics.go b/pkg/apiserver/rest/metrics.go index 1457ffb8e16..6f1b0476854 100644 --- a/pkg/apiserver/rest/metrics.go +++ b/pkg/apiserver/rest/metrics.go @@ -9,9 +9,11 @@ import ( ) type dualWriterMetrics struct { - legacy *prometheus.HistogramVec - storage *prometheus.HistogramVec - outcome *prometheus.HistogramVec + legacy *prometheus.HistogramVec + storage *prometheus.HistogramVec + outcome *prometheus.HistogramVec + syncer *prometheus.HistogramVec + syncerOutcome *prometheus.HistogramVec } // DualWriterStorageDuration is a metric summary for dual writer storage duration per mode @@ -38,15 +40,41 @@ var DualWriterOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ NativeHistogramBucketFactor: 1.1, }, []string{"mode", "name", "method"}) +var DualWriterReadLegacyCounts = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "dual_writer_read_legacy_count", + Help: "Histogram for the runtime of dual writer reads from legacy", + Namespace: "grafana", +}, []string{"kind", "method"}) + +// DualWriterSyncerDuration is a metric summary for dual writer sync duration per mode +var DualWriterSyncerDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dual_writer_data_syncer_duration_seconds", + Help: "Histogram for the runtime of dual writer data syncer duration per mode", + Namespace: "grafana", + NativeHistogramBucketFactor: 1.1, +}, []string{"is_error", "mode", "kind"}) + +// DualWriterDataSyncerOutcome is a metric summary for dual writer data syncer outcome comparison between the 2 stores per mode +var DualWriterDataSyncerOutcome = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dual_writer_data_syncer_outcome", + Help: "Histogram for the runtime of dual writer data syncer outcome comparison between the 2 stores per mode", + Namespace: "grafana", + NativeHistogramBucketFactor: 1.1, +}, []string{"mode", "kind"}) + func (m *dualWriterMetrics) init(reg prometheus.Registerer) { log := klog.NewKlogr() m.legacy = DualWriterLegacyDuration m.storage = DualWriterStorageDuration m.outcome = DualWriterOutcome + m.syncer = DualWriterSyncerDuration + m.syncerOutcome = DualWriterDataSyncerOutcome errLegacy := reg.Register(m.legacy) errStorage := reg.Register(m.storage) errOutcome := reg.Register(m.outcome) - if errLegacy != nil || errStorage != nil || errOutcome != nil { + errSyncer := reg.Register(m.syncer) + errSyncerOutcome := reg.Register(m.syncer) + if errLegacy != nil || errStorage != nil || errOutcome != nil || errSyncer != nil || errSyncerOutcome != nil { log.Info("cloud migration metrics already registered") } } @@ -68,3 +96,16 @@ func (m *dualWriterMetrics) recordOutcome(mode string, name string, areEqual boo } m.outcome.WithLabelValues(mode, name, method).Observe(observeValue) } + +func (m *dualWriterMetrics) recordDataSyncerDuration(isError bool, mode string, kind string, startFrom time.Time) { + duration := time.Since(startFrom).Seconds() + m.syncer.WithLabelValues(strconv.FormatBool(isError), mode, kind).Observe(duration) +} + +func (m *dualWriterMetrics) recordDataSyncerOutcome(mode string, kind string, synced bool) { + var observeValue float64 + if !synced { + observeValue = 1 + } + m.syncerOutcome.WithLabelValues(mode, kind).Observe(observeValue) +} diff --git a/pkg/cmd/grafana/apiserver/server.go b/pkg/cmd/grafana/apiserver/server.go index 0b43233170d..13290ba5cd9 100644 --- a/pkg/cmd/grafana/apiserver/server.go +++ b/pkg/cmd/grafana/apiserver/server.go @@ -167,7 +167,7 @@ func (o *APIServerOptions) RunAPIServer(ctx context.Context, config *genericapis // Install the API Group+version // #TODO figure out how to configure storage type in o.Options.StorageOptions err = builder.InstallAPIs(grafanaAPIServer.Scheme, grafanaAPIServer.Codecs, server, config.RESTOptionsGetter, o.builders, o.Options.StorageOptions, - o.Options.MetricsOptions.MetricsRegisterer, nil, nil, // no need for server lock in standalone + o.Options.MetricsOptions.MetricsRegisterer, nil, nil, nil, // no need for server lock in standalone ) if err != nil { return err diff --git a/pkg/services/apiserver/builder/helper.go b/pkg/services/apiserver/builder/helper.go index 376df20edbd..2b8067f55cc 100644 --- a/pkg/services/apiserver/builder/helper.go +++ b/pkg/services/apiserver/builder/helper.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" + k8srequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/util/openapi" @@ -26,6 +27,7 @@ import ( "github.com/grafana/grafana/pkg/apiserver/endpoints/filters" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" + "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/apiserver/options" ) @@ -132,6 +134,15 @@ type ServerLockService interface { LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error } +func getRequestInfo(gr schema.GroupResource, namespaceMapper request.NamespaceMapper) *k8srequest.RequestInfo { + return &k8srequest.RequestInfo{ + APIGroup: gr.Group, + Resource: gr.Resource, + Name: "", + Namespace: namespaceMapper(int64(1)), + } +} + func InstallAPIs( scheme *runtime.Scheme, codecs serializer.CodecFactory, @@ -140,6 +151,7 @@ func InstallAPIs( builders []APIGroupBuilder, storageOpts *options.StorageOptions, reg prometheus.Registerer, + namespaceMapper request.NamespaceMapper, kvStore grafanarest.NamespacedKVStore, serverLock ServerLockService, ) error { @@ -155,9 +167,12 @@ func InstallAPIs( // when missing this will default to mode zero (legacy only) mode := storageOpts.DualWriterDesiredModes[key] + // TODO: inherited context from main Grafana process + ctx := context.Background() + // Moving from one version to the next can only happen after the previous step has // successfully synchronized. - currentMode, err := grafanarest.SetDualWritingMode(context.Background(), kvStore, legacy, storage, key, mode, reg) + currentMode, err := grafanarest.SetDualWritingMode(ctx, kvStore, legacy, storage, key, mode, reg, serverLock, getRequestInfo(gr, namespaceMapper)) if err != nil { return nil, err } @@ -168,6 +183,10 @@ func InstallAPIs( return storage, nil default: } + + if storageOpts.DualWriterDataSyncJobEnabled[key] { + grafanarest.StartPeriodicDataSyncer(ctx, currentMode, legacy, storage, key, reg, serverLock, getRequestInfo(gr, namespaceMapper)) + } return grafanarest.NewDualWriter(currentMode, legacy, storage, reg, key), nil } } diff --git a/pkg/services/apiserver/config.go b/pkg/services/apiserver/config.go index b6172e2fe0a..3682390466f 100644 --- a/pkg/services/apiserver/config.go +++ b/pkg/services/apiserver/config.go @@ -60,6 +60,11 @@ func applyGrafanaConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles, o playlist.GROUPRESOURCE: 2, } + o.StorageOptions.DualWriterDataSyncJobEnabled = map[string]bool{ + // TODO: This will be enabled later, when we get a dedicated config section for unified_storage + // playlist.RESOURCE + "." + playlist.GROUP: true, + } + // TODO: ensure backwards compatibility with production // remove this after changing the unified_storage_mode key format in HGAPI o.StorageOptions.DualWriterDesiredModes[playlist.RESOURCE+"."+playlist.GROUP] = o.StorageOptions.DualWriterDesiredModes[playlist.GROUPRESOURCE] diff --git a/pkg/services/apiserver/options/storage.go b/pkg/services/apiserver/options/storage.go index 088bf5e5fe8..b53dbc353e4 100644 --- a/pkg/services/apiserver/options/storage.go +++ b/pkg/services/apiserver/options/storage.go @@ -22,10 +22,11 @@ const ( ) type StorageOptions struct { - StorageType StorageType - DataPath string - Address string - DualWriterDesiredModes map[string]grafanarest.DualWriterMode + StorageType StorageType + DataPath string + Address string + DualWriterDesiredModes map[string]grafanarest.DualWriterMode + DualWriterDataSyncJobEnabled map[string]bool } func NewStorageOptions() *StorageOptions { diff --git a/pkg/services/apiserver/service.go b/pkg/services/apiserver/service.go index 0796758c6c6..aecc56b3f66 100644 --- a/pkg/services/apiserver/service.go +++ b/pkg/services/apiserver/service.go @@ -145,21 +145,22 @@ func ProvideService( pluginStore pluginstore.Store, ) (*service, error) { s := &service{ - cfg: cfg, - features: features, - rr: rr, - startedCh: make(chan struct{}), - stopCh: make(chan struct{}), - builders: []builder.APIGroupBuilder{}, - authorizer: authorizer.NewGrafanaAuthorizer(cfg, orgService), - tracing: tracing, - db: db, // For Unified storage - metrics: metrics.ProvideRegisterer(), - kvStore: kvStore, - pluginClient: pluginClient, - datasources: datasources, - contextProvider: contextProvider, - pluginStore: pluginStore, + cfg: cfg, + features: features, + rr: rr, + startedCh: make(chan struct{}), + stopCh: make(chan struct{}), + builders: []builder.APIGroupBuilder{}, + authorizer: authorizer.NewGrafanaAuthorizer(cfg, orgService), + tracing: tracing, + db: db, // For Unified storage + metrics: metrics.ProvideRegisterer(), + kvStore: kvStore, + pluginClient: pluginClient, + datasources: datasources, + contextProvider: contextProvider, + pluginStore: pluginStore, + serverLockService: serverLockService, } // This will be used when running as a dskit service @@ -341,7 +342,7 @@ func (s *service) start(ctx context.Context) error { // Install the API group+version err = builder.InstallAPIs(Scheme, Codecs, server, serverConfig.RESTOptionsGetter, builders, o.StorageOptions, // Required for the dual writer initialization - s.metrics, kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService, + s.metrics, request.GetNamespaceMapper(s.cfg), kvstore.WithNamespace(s.kvStore, 0, "storage.dualwriting"), s.serverLockService, ) if err != nil { return err