K8s/Storage: Refactor Upsert (create from update) (#102528)

pull/101973/head
Ryan McKinley 4 months ago committed by GitHub
parent 9fce4311e9
commit 0845c781ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      pkg/apimachinery/utils/meta.go
  2. 144
      pkg/storage/unified/apistore/store.go
  3. 16
      pkg/storage/unified/apistore/util.go

@ -143,9 +143,13 @@ type grafanaMetaAccessor struct {
// required fields are missing. Fields that are not required return the default
// value and are a no-op if set.
func MetaAccessor(raw interface{}) (GrafanaMetaAccessor, error) {
if raw == nil {
return nil, fmt.Errorf("unable to read metadata from nil object")
}
obj, err := meta.Accessor(raw)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to read metadata from: %T, %s", raw, err)
}
// reflection to find title and other non object properties

@ -445,11 +445,11 @@ func (s *Storage) GuaranteedUpdate(
cachedExistingObject runtime.Object,
) error {
var (
res storage.ResponseMeta
updatedObj runtime.Object
existingObj runtime.Object
created bool
err error
res storage.ResponseMeta
updatedObj runtime.Object
existingObj runtime.Object
existingBytes []byte
err error
)
req := &resource.UpdateRequest{}
req.Key, err = s.getKey(key)
@ -465,61 +465,71 @@ func (s *Storage) GuaranteedUpdate(
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
// Read the latest value
rsp, err := s.store.Read(ctx, &resource.ReadRequest{Key: req.Key})
readResponse, err := s.store.Read(ctx, &resource.ReadRequest{Key: req.Key})
if err != nil {
return resource.GetError(resource.AsErrorResult(err))
}
if rsp.Error != nil {
if rsp.Error.Code == http.StatusNotFound {
if readResponse.Error != nil {
if readResponse.Error.Code == http.StatusNotFound {
if !ignoreNotFound {
return apierrors.NewNotFound(s.gr, req.Key.Name)
}
} else {
return resource.GetError(rsp.Error)
return resource.GetError(readResponse.Error)
}
}
created = true
existingObj = s.newFunc()
if len(rsp.Value) > 0 {
created = false
_, err = s.convertToObject(rsp.Value, existingObj)
if err != nil {
return err
// Upsert? (create because it does not already exist)
if len(readResponse.Value) == 0 {
if !ignoreNotFound {
return apierrors.NewNotFound(s.gr, req.Key.Name)
}
mmm, err := utils.MetaAccessor(existingObj)
updatedObj, _, err = tryUpdate(s.newFunc(), res)
if err != nil {
return err
if attempt >= MaxUpdateAttempts {
return err
}
continue
}
mmm.SetResourceVersionInt64(rsp.ResourceVersion)
res.ResourceVersion = uint64(rsp.ResourceVersion)
return s.Create(ctx, key, updatedObj, destination, 0)
}
if rest.IsDualWriteUpdate(ctx) {
// Ignore the RV when updating legacy values
mmm.SetResourceVersion("")
} else {
if err := preconditions.Check(key, existingObj); err != nil {
if attempt >= MaxUpdateAttempts {
return fmt.Errorf("precondition failed: %w", err)
}
continue
existingBytes = readResponse.Value
existingObj, err = s.convertToObject(readResponse.Value, s.newFunc())
if err != nil {
return err
}
existing, err := utils.MetaAccessor(existingObj)
if err != nil {
return err
}
existing.SetResourceVersionInt64(readResponse.ResourceVersion)
res.ResourceVersion = uint64(readResponse.ResourceVersion)
if rest.IsDualWriteUpdate(ctx) {
// Ignore the RV when updating legacy values
existing.SetResourceVersion("")
} else {
if err := preconditions.Check(key, existingObj); err != nil {
if attempt >= MaxUpdateAttempts {
return fmt.Errorf("precondition failed: %w", err)
}
continue
}
}
// restore the full original object before tryUpdate
if s.opts.LargeObjectSupport != nil && mmm.GetBlob() != nil {
err = s.opts.LargeObjectSupport.Reconstruct(ctx, req.Key, s.store, mmm)
if err != nil {
return err
}
// restore the full original object before tryUpdate
if s.opts.LargeObjectSupport != nil && existing.GetBlob() != nil {
err = s.opts.LargeObjectSupport.Reconstruct(ctx, req.Key, s.store, existing)
if err != nil {
return err
}
} else if !ignoreNotFound {
return apierrors.NewNotFound(s.gr, req.Key.Name)
}
updatedObj, _, err = tryUpdate(existingObj.DeepCopyObject(), res)
updatedObj, _, err = tryUpdate(existingObj, res)
if err != nil {
if attempt >= MaxUpdateAttempts {
return err
@ -529,64 +539,32 @@ func (s *Storage) GuaranteedUpdate(
break
}
unchanged, err := isUnchanged(s.codec, existingObj, updatedObj)
req.Value, err = s.prepareObjectForUpdate(ctx, updatedObj, existingObj)
if err != nil {
return err
}
if unchanged {
var buf bytes.Buffer
if err = s.codec.Encode(updatedObj, &buf); err != nil {
return err
}
if _, err := s.convertToObject(buf.Bytes(), destination); err != nil {
return err
}
return nil
}
var (
value []byte
rv int64
)
if created {
value, err = s.prepareObjectForStorage(ctx, updatedObj)
if err != nil {
return err
}
rsp2, err := s.store.Create(ctx, &resource.CreateRequest{
Key: req.Key,
Value: value,
})
var rv uint64
// Only update (for real) if the bytes have changed
if !bytes.Equal(req.Value, existingBytes) {
updateResponse, err := s.store.Update(ctx, req)
if err != nil {
return resource.GetError(resource.AsErrorResult(err))
}
if rsp2.Error != nil {
return resource.GetError(rsp2.Error)
}
rv = rsp2.ResourceVersion
} else {
value, err = s.prepareObjectForUpdate(ctx, updatedObj, existingObj)
if err != nil {
return err
if updateResponse.Error != nil {
return resource.GetError(updateResponse.Error)
}
req.Value = value
rsp2, err := s.store.Update(ctx, req)
if err != nil {
return resource.GetError(resource.AsErrorResult(err))
}
if rsp2.Error != nil {
return resource.GetError(rsp2.Error)
}
rv = rsp2.ResourceVersion
rv = uint64(updateResponse.ResourceVersion)
}
if _, err := s.convertToObject(value, destination); err != nil {
if _, err := s.convertToObject(req.Value, destination); err != nil {
return err
}
if err := s.versioner.UpdateObject(destination, uint64(rv)); err != nil {
return err
if rv > 0 {
if err := s.versioner.UpdateObject(destination, rv); err != nil {
return err
}
}
return nil

@ -6,13 +6,11 @@
package apistore
import (
"bytes"
"fmt"
"strconv"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apiserver/pkg/storage"
@ -112,17 +110,3 @@ func toListRequest(k *resource.ResourceKey, opts storage.ListOptions) (*resource
return req, predicate, nil
}
func isUnchanged(codec runtime.Codec, obj runtime.Object, newObj runtime.Object) (bool, error) {
buf := new(bytes.Buffer)
if err := codec.Encode(obj, buf); err != nil {
return false, err
}
newBuf := new(bytes.Buffer)
if err := codec.Encode(newObj, newBuf); err != nil {
return false, err
}
return bytes.Equal(buf.Bytes(), newBuf.Bytes()), nil
}

Loading…
Cancel
Save