diff --git a/pkg/storage/unified/resource/kv.go b/pkg/storage/unified/resource/kv.go index e4af06a9aa1..21343984c8c 100644 --- a/pkg/storage/unified/resource/kv.go +++ b/pkg/storage/unified/resource/kv.go @@ -68,6 +68,10 @@ func NewBadgerKV(db *badger.DB) *badgerKV { } func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObject, error) { + if k.db.IsClosed() { + return KVObject{}, fmt.Errorf("database is closed") + } + txn := k.db.NewTransaction(false) defer txn.Discard() @@ -101,6 +105,10 @@ func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObjec } func (k *badgerKV) Save(ctx context.Context, section string, key string, value io.Reader) error { + if k.db.IsClosed() { + return fmt.Errorf("database is closed") + } + if section == "" { return fmt.Errorf("section is required") } @@ -123,6 +131,10 @@ func (k *badgerKV) Save(ctx context.Context, section string, key string, value i } func (k *badgerKV) Delete(ctx context.Context, section string, key string) error { + if k.db.IsClosed() { + return fmt.Errorf("database is closed") + } + if section == "" { return fmt.Errorf("section is required") } @@ -149,6 +161,11 @@ func (k *badgerKV) Delete(ctx context.Context, section string, key string) error } func (k *badgerKV) Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] { + if k.db.IsClosed() { + return func(yield func(string, error) bool) { + yield("", fmt.Errorf("database is closed")) + } + } if section == "" { return func(yield func(string, error) bool) { yield("", fmt.Errorf("section is required")) diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 35dc9de717b..00567a3774c 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -782,6 +782,11 @@ func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resource return nil, apierrors.NewBadRequest( fmt.Sprintf("unable to read previous object, %v", err)) } + oldObj, err := utils.MetaAccessor(marker) + if err != nil { + return nil, err + } + obj, err := utils.MetaAccessor(marker) if err != nil { return nil, err @@ -793,6 +798,8 @@ func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resource obj.SetUpdatedBy(user.GetUID()) obj.SetGeneration(utils.DeletedGeneration) obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it + event.ObjectOld = oldObj + event.Object = obj event.Value, err = marker.MarshalJSON() if err != nil { return nil, apierrors.NewBadRequest( diff --git a/pkg/storage/unified/resource/storage_backend.go b/pkg/storage/unified/resource/storage_backend.go index 53a898e50b2..dff33f63b36 100644 --- a/pkg/storage/unified/resource/storage_backend.go +++ b/pkg/storage/unified/resource/storage_backend.go @@ -63,6 +63,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in } rv := k.snowflake.Generate().Int64() + obj := event.Object // Write data. var action DataAction switch event.Type { @@ -87,10 +88,15 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in action = DataActionUpdated case resourcepb.WatchEvent_DELETED: action = DataActionDeleted + obj = event.ObjectOld default: return 0, fmt.Errorf("invalid event type: %d", event.Type) } + if obj == nil { + return 0, fmt.Errorf("object is nil") + } + // Build the search document doc, err := k.builder.BuildDocument(ctx, event.Key, rv, event.Value) if err != nil { @@ -119,7 +125,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in Name: event.Key.Name, ResourceVersion: rv, Action: action, - Folder: event.Object.GetFolder(), + Folder: obj.GetFolder(), }, Value: MetaData{ IndexableDocument: *doc, @@ -137,7 +143,7 @@ func (k *kvStorageBackend) WriteEvent(ctx context.Context, event WriteEvent) (in Name: event.Key.Name, ResourceVersion: rv, Action: action, - Folder: event.Object.GetFolder(), + Folder: obj.GetFolder(), PreviousRV: event.PreviousRV, }) if err != nil { diff --git a/pkg/storage/unified/resource/storage_backend_test.go b/pkg/storage/unified/resource/storage_backend_test.go index 899f7b2de25..bb1288458d0 100644 --- a/pkg/storage/unified/resource/storage_backend_test.go +++ b/pkg/storage/unified/resource/storage_backend_test.go @@ -72,6 +72,7 @@ func TestKvStorageBackend_WriteEvent_Success(t *testing.T) { }, Value: objectToJSONBytes(t, testObj), Object: metaAccessor, + ObjectOld: metaAccessor, PreviousRV: 100, } @@ -799,6 +800,8 @@ func TestKvStorageBackend_ListTrash_Success(t *testing.T) { // Delete the resource writeEvent.Type = resourcepb.WatchEvent_DELETED writeEvent.PreviousRV = rv1 + writeEvent.Object = metaAccessor + writeEvent.ObjectOld = metaAccessor rv2, err := backend.WriteEvent(ctx, writeEvent) require.NoError(t, err) @@ -989,8 +992,12 @@ func writeObject(t *testing.T, backend *kvStorageBackend, obj *unstructured.Unst }, Value: objectToJSONBytes(t, obj), Object: metaAccessor, + ObjectOld: metaAccessor, PreviousRV: previousRV, } + if eventType == resourcepb.WatchEvent_ADDED { + writeEvent.ObjectOld = nil + } return backend.WriteEvent(context.Background(), writeEvent) } diff --git a/pkg/storage/unified/testing/storage_backend.go b/pkg/storage/unified/testing/storage_backend.go index 2f369c6d233..e3314376715 100644 --- a/pkg/storage/unified/testing/storage_backend.go +++ b/pkg/storage/unified/testing/storage_backend.go @@ -1106,7 +1106,7 @@ func writeEvent(ctx context.Context, store resource.StorageBackend, name string, } meta.SetFolder(options.Folder) - return store.WriteEvent(ctx, resource.WriteEvent{ + event := resource.WriteEvent{ Type: action, Value: options.Value, GUID: uuid.New().String(), @@ -1116,8 +1116,32 @@ func writeEvent(ctx context.Context, store resource.StorageBackend, name string, Resource: options.Resource, Name: name, }, - Object: meta, - }) + } + switch action { + case resourcepb.WatchEvent_DELETED: + event.ObjectOld = meta + + obj, err := utils.MetaAccessor(res) + if err != nil { + return 0, err + } + now := metav1.Now() + obj.SetDeletionTimestamp(&now) + obj.SetUpdatedTimestamp(&now.Time) + obj.SetManagedFields(nil) + obj.SetFinalizers(nil) + obj.SetGeneration(utils.DeletedGeneration) + obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it + event.Object = obj + case resourcepb.WatchEvent_ADDED: + event.Object = meta + case resourcepb.WatchEvent_MODIFIED: + event.Object = meta // + event.ObjectOld = meta + default: + panic(fmt.Sprintf("invalid action: %s", action)) + } + return store.WriteEvent(ctx, event) } func newServer(t *testing.T, b resource.StorageBackend) resource.ResourceServer { diff --git a/pkg/storage/unified/testing/storage_backend_test.go b/pkg/storage/unified/testing/storage_backend_test.go index c0be0065c2f..a1da3b82e72 100644 --- a/pkg/storage/unified/testing/storage_backend_test.go +++ b/pkg/storage/unified/testing/storage_backend_test.go @@ -11,7 +11,6 @@ import ( ) func TestBadgerKVStorageBackend(t *testing.T) { - t.Skip("failing with 'panic: DB Closed'") RunStorageBackendTest(t, func(ctx context.Context) resource.StorageBackend { opts := badger.DefaultOptions("").WithInMemory(true).WithLogger(nil) db, err := badger.Open(opts)