From 1dd59ca599bcc4357b5dde7256e996926533a16d Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Wed, 21 May 2025 19:25:40 +0100 Subject: [PATCH] DualWriter: remove RV+UID for secondary update (#105543) --- pkg/apiserver/rest/dualwriter.go | 10 ---- pkg/storage/legacysql/dualwrite/dualwriter.go | 52 +++++++++++++++---- pkg/storage/unified/apistore/store.go | 14 ++--- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/pkg/apiserver/rest/dualwriter.go b/pkg/apiserver/rest/dualwriter.go index 3b4196935de..4723f102ca9 100644 --- a/pkg/apiserver/rest/dualwriter.go +++ b/pkg/apiserver/rest/dualwriter.go @@ -24,16 +24,6 @@ var ( _ rest.SingularNameProvider = (DualWriter)(nil) ) -type dualWriteContextKey struct{} - -func IsDualWriteUpdate(ctx context.Context) bool { - return ctx.Value(dualWriteContextKey{}) == true -} - -func WithDualWriteUpdate(ctx context.Context) context.Context { - return context.WithValue(ctx, dualWriteContextKey{}, true) -} - // Function that will create a dual writer type DualWriteBuilder func(gr schema.GroupResource, legacy Storage, unified Storage) (Storage, error) diff --git a/pkg/storage/legacysql/dualwrite/dualwriter.go b/pkg/storage/legacysql/dualwrite/dualwriter.go index 7e3c955fa26..5ac9889025a 100644 --- a/pkg/storage/legacysql/dualwrite/dualwriter.go +++ b/pkg/storage/legacysql/dualwrite/dualwriter.go @@ -13,7 +13,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "github.com/grafana/grafana-app-sdk/logging" - + "github.com/grafana/grafana/pkg/apimachinery/utils" grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" ) @@ -202,36 +202,44 @@ func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation r func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { log := d.log.With("method", "Update").WithContext(ctx) - // The incoming RV is not stable -- it may be from legacy or storage! - // This sets a flag in the context and our apistore is more lenient when it exists - ctx = grafanarest.WithDualWriteUpdate(ctx) - // update in legacy first, and then unistore. Will return a failure if either fails. // // we want to update in legacy first, otherwise if the update from unistore was successful, // but legacy failed, the user would get a failure, but see the update did apply to the source // of truth, and be less likely to retry to save (and get the stores in sync again) - objFromLegacy, createdLegacy, err := d.legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + legacyInfo := objInfo + legacyForceCreate := forceAllowCreate + unifiedInfo := objInfo + unifiedForceCreate := forceAllowCreate + if d.readUnified { + legacyInfo = &wrappedUpdateInfo{objInfo} + legacyForceCreate = true + } else { + unifiedInfo = &wrappedUpdateInfo{objInfo} + unifiedForceCreate = true + } + + objFromLegacy, createdLegacy, err := d.legacy.Update(ctx, name, legacyInfo, createValidation, updateValidation, legacyForceCreate, options) if err != nil { log.With("object", objFromLegacy).Error("could not update in legacy storage", "err", err) return nil, false, err } - // If unified storage is our primary store, just update it there and return. + if d.readUnified { - return d.unified.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) + return d.unified.Update(ctx, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options) } else if d.errorIsOK { // If unified is not primary, but errors are okay, we can just run in the background. go func(ctxBg context.Context, cancel context.CancelFunc) { defer cancel() - if _, _, err := d.unified.Update(ctxBg, name, objInfo, createValidation, updateValidation, forceAllowCreate, options); err != nil { + if _, _, err := d.unified.Update(ctxBg, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil { log.Error("failed background UPDATE to unified storage", "err", err) } }(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout)) return objFromLegacy, createdLegacy, nil } // If we want to check unified errors just run it in foreground. - if _, _, err := d.unified.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options); err != nil { + if _, _, err := d.unified.Update(ctx, name, unifiedInfo, createValidation, updateValidation, unifiedForceCreate, options); err != nil { return nil, false, err } return objFromLegacy, createdLegacy, nil @@ -298,3 +306,27 @@ func (d *dualWriter) NewList() runtime.Object { func (d *dualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { return d.unified.ConvertToTable(ctx, object, tableOptions) } + +type wrappedUpdateInfo struct { + objInfo rest.UpdatedObjectInfo +} + +// Preconditions implements rest.UpdatedObjectInfo. +func (w *wrappedUpdateInfo) Preconditions() *metav1.Preconditions { + return nil +} + +// UpdatedObject implements rest.UpdatedObjectInfo. +func (w *wrappedUpdateInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error) { + obj, err := w.objInfo.UpdatedObject(ctx, oldObj) + if err != nil { + return nil, err + } + meta, err := utils.MetaAccessor(obj) + if err != nil { + return nil, err + } + meta.SetResourceVersion("") + meta.SetUID("") + return obj, err +} diff --git a/pkg/storage/unified/apistore/store.go b/pkg/storage/unified/apistore/store.go index 0e8575742c8..f7bc0dd2780 100644 --- a/pkg/storage/unified/apistore/store.go +++ b/pkg/storage/unified/apistore/store.go @@ -35,7 +35,6 @@ import ( "github.com/grafana/grafana/pkg/apimachinery/utils" grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" - "github.com/grafana/grafana/pkg/apiserver/rest" "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) @@ -557,16 +556,11 @@ func (s *Storage) GuaranteedUpdate( existing.SetResourceVersionInt64(readResponse.ResourceVersion) res.ResourceVersion = uint64(readResponse.ResourceVersion) - if rest.IsDualWriteUpdate(ctx) { - // Ignore the RV when updating legacy values - existing.SetResourceVersion("") - } else { - if err := preconditions.Check(key, existingObj); err != nil { - if attempt >= MaxUpdateAttempts { - return fmt.Errorf("precondition failed: %w", err) - } - continue + if err := preconditions.Check(key, existingObj); err != nil { + if attempt >= MaxUpdateAttempts { + return fmt.Errorf("precondition failed: %w", err) } + continue } // restore the full original object before tryUpdate