|
|
|
|
@ -57,28 +57,37 @@ func (d *DualWriterMode1) Create(ctx context.Context, original runtime.Object, c |
|
|
|
|
|
|
|
|
|
createdCopy := created.DeepCopyObject() |
|
|
|
|
|
|
|
|
|
go func(createdCopy runtime.Object) { |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage create timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.createOnUnifiedStorage(ctx, original, createValidation, createdCopy, options) |
|
|
|
|
|
|
|
|
|
if err := enrichLegacyObject(original, createdCopy); err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
return created, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options) |
|
|
|
|
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, createdCopy) |
|
|
|
|
d.recordOutcome(mode1Str, getName(createdCopy), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
}(createdCopy) |
|
|
|
|
func (d *DualWriterMode1) createOnUnifiedStorage(ctx context.Context, original runtime.Object, createValidation rest.ValidateObjectFunc, createdCopy runtime.Object, options *metav1.CreateOptions) error { |
|
|
|
|
var method = "create" |
|
|
|
|
log := d.Log.WithValues("method", method) |
|
|
|
|
|
|
|
|
|
return created, err |
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage create timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
|
if err := enrichLegacyObject(original, createdCopy); err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
storageObj, errObjectSt := d.Storage.Create(ctx, createdCopy, createValidation, options) |
|
|
|
|
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if errObjectSt != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, createdCopy) |
|
|
|
|
d.recordOutcome(mode1Str, getName(createdCopy), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return errObjectSt |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
|
|
|
|
|
@ -94,27 +103,36 @@ func (d *DualWriterMode1) Get(ctx context.Context, name string, options *metav1. |
|
|
|
|
} |
|
|
|
|
d.recordLegacyDuration(errLegacy != nil, mode1Str, d.resource, method, startLegacy) |
|
|
|
|
|
|
|
|
|
go func(res runtime.Object) { |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage get timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.Get(ctx, name, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Error(err, "unable to get object in storage") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.WithValues("name", name).Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
}(res) |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.getFromUnifiedStorage(ctx, res, name, options) |
|
|
|
|
|
|
|
|
|
return res, errLegacy |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) getFromUnifiedStorage(ctx context.Context, res runtime.Object, name string, options *metav1.GetOptions) error { |
|
|
|
|
var method = "get" |
|
|
|
|
log := d.Log.WithValues("method", method, "name", name) |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage get timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.Get(ctx, name, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Error(err, "unable to get object in storage") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.WithValues("name", name).Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List overrides the behavior of the generic DualWriter and reads only from LegacyStorage.
|
|
|
|
|
func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { |
|
|
|
|
var method = "list" |
|
|
|
|
@ -128,25 +146,35 @@ func (d *DualWriterMode1) List(ctx context.Context, options *metainternalversion |
|
|
|
|
} |
|
|
|
|
d.recordLegacyDuration(errLegacy != nil, mode1Str, d.resource, method, startLegacy) |
|
|
|
|
|
|
|
|
|
go func(res runtime.Object) { |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage list timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.List(ctx, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, getName(res), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
}(res) |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.listFromUnifiedStorage(ctx, options, res) |
|
|
|
|
|
|
|
|
|
return res, errLegacy |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) listFromUnifiedStorage(ctx context.Context, options *metainternalversion.ListOptions, res runtime.Object) error { |
|
|
|
|
var method = "list" |
|
|
|
|
log := d.Log.WithValues("resourceVersion", options.ResourceVersion, "method", method) |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage list timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.List(ctx, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Error(err, "unable to list objects from unified storage") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, getName(res), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { |
|
|
|
|
var method = "delete" |
|
|
|
|
log := d.Log.WithValues("name", name, "method", method, "name", name) |
|
|
|
|
@ -161,25 +189,34 @@ func (d *DualWriterMode1) Delete(ctx context.Context, name string, deleteValidat |
|
|
|
|
} |
|
|
|
|
d.recordLegacyDuration(false, mode1Str, name, method, startLegacy) |
|
|
|
|
|
|
|
|
|
go func(res runtime.Object) { |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage delete timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
}(res) |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.deleteFromUnifiedStorage(ctx, res, name, deleteValidation, options) |
|
|
|
|
|
|
|
|
|
return res, async, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) deleteFromUnifiedStorage(ctx context.Context, res runtime.Object, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error { |
|
|
|
|
var method = "delete" |
|
|
|
|
log := d.Log.WithValues("name", name, "method", method, "name", name) |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage delete timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, _, err := d.Storage.Delete(ctx, name, deleteValidation, options) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DeleteCollection overrides the behavior of the generic DualWriter and deletes only from LegacyStorage.
|
|
|
|
|
func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { |
|
|
|
|
var method = "delete-collection" |
|
|
|
|
@ -195,25 +232,34 @@ func (d *DualWriterMode1) DeleteCollection(ctx context.Context, deleteValidation |
|
|
|
|
} |
|
|
|
|
d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy) |
|
|
|
|
|
|
|
|
|
go func(res runtime.Object) { |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage deletecollection timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, getName(res), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
}(res) |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.deleteCollectionFromUnifiedStorage(ctx, res, deleteValidation, options, listOptions) |
|
|
|
|
|
|
|
|
|
return res, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) deleteCollectionFromUnifiedStorage(ctx context.Context, res runtime.Object, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) error { |
|
|
|
|
var method = "delete-collection" |
|
|
|
|
log := d.Log.WithValues("resourceVersion", listOptions.ResourceVersion, "method", method) |
|
|
|
|
|
|
|
|
|
startStorage := time.Now() |
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage deletecollection timeout")) |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) |
|
|
|
|
d.recordStorageDuration(err != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, getName(res), areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { |
|
|
|
|
var method = "update" |
|
|
|
|
log := d.Log.WithValues("name", name, "method", method, "name", name) |
|
|
|
|
@ -228,52 +274,61 @@ func (d *DualWriterMode1) Update(ctx context.Context, name string, objInfo rest. |
|
|
|
|
} |
|
|
|
|
d.recordLegacyDuration(false, mode1Str, d.resource, method, startLegacy) |
|
|
|
|
|
|
|
|
|
go func(res runtime.Object) { |
|
|
|
|
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("storage update timeout")) |
|
|
|
|
|
|
|
|
|
resCopy := res.DeepCopyObject() |
|
|
|
|
// get the object to be updated
|
|
|
|
|
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
if !apierrors.IsNotFound(err) { |
|
|
|
|
log.WithValues("object", foundObj).Error(err, "could not get object to update") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
log.Info("object not found for update, creating one") |
|
|
|
|
} |
|
|
|
|
//nolint:errcheck
|
|
|
|
|
go d.updateOnUnifiedStorage(ctx, res, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
|
|
|
|
|
|
|
|
|
return res, async, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
updated, err := objInfo.UpdatedObject(ctx, resCopy) |
|
|
|
|
if err != nil { |
|
|
|
|
log.WithValues("object", updated).Error(err, "could not update or create object") |
|
|
|
|
func (d *DualWriterMode1) updateOnUnifiedStorage(ctx context.Context, res runtime.Object, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) error { |
|
|
|
|
var method = "update" |
|
|
|
|
log := d.Log.WithValues("name", name, "method", method, "name", name) |
|
|
|
|
|
|
|
|
|
// Ignores cancellation signals from parent context. Will automatically be canceled after 10 seconds.
|
|
|
|
|
ctx, cancel := context.WithTimeoutCause(context.WithoutCancel(ctx), time.Second*10, errors.New("storage update timeout")) |
|
|
|
|
|
|
|
|
|
resCopy := res.DeepCopyObject() |
|
|
|
|
// get the object to be updated
|
|
|
|
|
foundObj, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
if !apierrors.IsNotFound(err) { |
|
|
|
|
log.WithValues("object", foundObj).Error(err, "could not get object to update") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
log.Info("object not found for update, creating one") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// if the object is found, create a new updateWrapper with the object found
|
|
|
|
|
if foundObj != nil { |
|
|
|
|
if err := enrichLegacyObject(foundObj, resCopy); err != nil { |
|
|
|
|
log.Error(err, "could not enrich object") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
objInfo = &updateWrapper{ |
|
|
|
|
upstream: objInfo, |
|
|
|
|
updated: resCopy, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
|
|
|
|
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
updated, err := objInfo.UpdatedObject(ctx, resCopy) |
|
|
|
|
if err != nil { |
|
|
|
|
log.WithValues("object", updated).Error(err, "could not update or create object") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// if the object is found, create a new updateWrapper with the object found
|
|
|
|
|
if foundObj != nil { |
|
|
|
|
if err := enrichLegacyObject(foundObj, resCopy); err != nil { |
|
|
|
|
log.Error(err, "could not enrich object") |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.WithValues("name", name).Info("object from legacy and storage are not equal") |
|
|
|
|
objInfo = &updateWrapper{ |
|
|
|
|
upstream: objInfo, |
|
|
|
|
updated: resCopy, |
|
|
|
|
} |
|
|
|
|
}(res) |
|
|
|
|
} |
|
|
|
|
startStorage := time.Now() |
|
|
|
|
defer cancel() |
|
|
|
|
storageObj, _, errObjectSt := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
|
|
|
|
d.recordStorageDuration(errObjectSt != nil, mode1Str, d.resource, method, startStorage) |
|
|
|
|
if err != nil { |
|
|
|
|
cancel() |
|
|
|
|
} |
|
|
|
|
areEqual := Compare(storageObj, res) |
|
|
|
|
d.recordOutcome(mode1Str, name, areEqual, method) |
|
|
|
|
if !areEqual { |
|
|
|
|
log.WithValues("name", name).Info("object from legacy and storage are not equal") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return res, async, err |
|
|
|
|
return errObjectSt |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *DualWriterMode1) Destroy() { |
|
|
|
|
|