From f66768c67d895cc681b0d182ad672eba189c2b83 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 14 Jun 2024 14:24:36 +0300 Subject: [PATCH] now with a base server implementation --- pkg/storage/unified/resource/fs.go | 153 +++++++ pkg/storage/unified/resource/hooks.go | 45 ++ pkg/storage/unified/resource/server.go | 469 ++++++++++++++++++++ pkg/storage/unified/resource/server_test.go | 116 +++++ 4 files changed, 783 insertions(+) create mode 100644 pkg/storage/unified/resource/fs.go create mode 100644 pkg/storage/unified/resource/hooks.go create mode 100644 pkg/storage/unified/resource/server.go create mode 100644 pkg/storage/unified/resource/server_test.go diff --git a/pkg/storage/unified/resource/fs.go b/pkg/storage/unified/resource/fs.go new file mode 100644 index 00000000000..b9072499f5a --- /dev/null +++ b/pkg/storage/unified/resource/fs.go @@ -0,0 +1,153 @@ +package resource + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "sort" + "strings" + + "github.com/hack-pad/hackpadfs" + "github.com/hack-pad/hackpadfs/mem" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type FileSystemOptions struct { + // OTel tracer + Tracer trace.Tracer + + // Root file system -- null will be in memory + Root hackpadfs.FS +} + +func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) { + if opts.Tracer == nil { + opts.Tracer = noop.NewTracerProvider().Tracer("fs") + } + + var err error + root := opts.Root + if root == nil { + root, err = mem.NewFS() + if err != nil { + return nil, err + } + } + + return &fsStore{tracer: opts.Tracer, root: root}, nil +} + +type fsStore struct { + tracer trace.Tracer + root hackpadfs.FS +} + +type fsEvent struct { + ResourceVersion int64 `json:"resourceVersion"` + Message string `json:"message,omitempty"` + Operation string `json:"operation,omitempty"` + Value json.RawMessage `json:"value,omitempty"` + BlobPath string `json:"blob,omitempty"` +} + +// The only write command +func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, error) { + body := fsEvent{ + ResourceVersion: event.EventID, + Message: event.Message, + Operation: event.Operation.String(), + Value: event.Value, + // Blob... + } + // For this case, we will treat them the same + event.Key.ResourceVersion = 0 + dir := event.Key.NamespacedPath() + err := hackpadfs.MkdirAll(f.root, dir, 0750) + if err != nil { + return 0, err + } + + bytes, err := json.Marshal(&body) + if err != nil { + return 0, err + } + + fpath := filepath.Join(dir, fmt.Sprintf("%d.json", event.EventID)) + file, err := hackpadfs.OpenFile(f.root, fpath, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate, 0750) + if err != nil { + return 0, err + } + _, err = hackpadfs.WriteFile(file, bytes) + return event.EventID, err +} + +// Read implements ResourceStoreServer. +func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { + rv := req.Key.ResourceVersion + req.Key.ResourceVersion = 0 + + fname := "--x--" + dir := req.Key.NamespacedPath() + if rv > 0 { + fname = fmt.Sprintf("%d.json", rv) + } else { + files, err := hackpadfs.ReadDir(f.root, dir) + if err != nil { + return nil, err + } + + // Sort by name + sort.Slice(files, func(i, j int) bool { + a := files[i].Name() + b := files[j].Name() + return a > b // ?? should we parse the numbers ??? + }) + + // The first matching file + for _, v := range files { + fname = v.Name() + if strings.HasSuffix(fname, ".json") { + break + } + } + } + + evt, err := f.open(filepath.Join(dir, fname)) + if err != nil || evt.Operation == ResourceOperation_DELETED.String() { + return nil, apierrors.NewNotFound(schema.GroupResource{ + Group: req.Key.Group, + Resource: req.Key.Resource, + }, req.Key.Name) + } + + return &ReadResponse{ + ResourceVersion: evt.ResourceVersion, + Value: evt.Value, + Message: evt.Message, + }, nil +} + +func (f *fsStore) open(p string) (*fsEvent, error) { + raw, err := hackpadfs.ReadFile(f.root, p) + if err != nil { + return nil, err + } + + evt := &fsEvent{} + err = json.Unmarshal(raw, evt) + return evt, err +} + +// List implements AppendingStore. +func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { + panic("unimplemented") +} + +// Watch implements AppendingStore. +func (f *fsStore) Watch(*WatchRequest, ResourceStore_WatchServer) error { + panic("unimplemented") +} diff --git a/pkg/storage/unified/resource/hooks.go b/pkg/storage/unified/resource/hooks.go new file mode 100644 index 00000000000..44c4a2dca42 --- /dev/null +++ b/pkg/storage/unified/resource/hooks.go @@ -0,0 +1,45 @@ +package resource + +import ( + context "context" + "fmt" + + "github.com/grafana/grafana/pkg/apimachinery/identity" +) + +type WriteAccessHooks struct { + // Check if a user has access to write folders + // When this is nil, no resources can have folders configured + Folder func(ctx context.Context, user identity.Requester, uid string) bool + + // When configured, this will make sure a user is allowed to save to a given origin + Origin func(ctx context.Context, user identity.Requester, origin string) bool +} + +type LifecycleHooks interface { + // Called once at initialization + Init() error + + // Stop function -- after calling this, any additional storage functions may error + Stop() +} + +func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error { + if a.Folder == nil { + return fmt.Errorf("writing folders is not supported") + } + if !a.Folder(ctx, user, uid) { + return fmt.Errorf("not allowed to write resource to folder") + } + return nil +} + +func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error { + if a.Origin == nil || uid == "UI" { + return nil // default to OK + } + if !a.Origin(ctx, user, uid) { + return fmt.Errorf("not allowed to write resource at origin") + } + return nil +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go new file mode 100644 index 00000000000..20682323e72 --- /dev/null +++ b/pkg/storage/unified/resource/server.go @@ -0,0 +1,469 @@ +package resource + +import ( + context "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/bwmarrin/snowflake" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/apimachinery/utils" +) + +// Package-level errors. +var ( + ErrNotFound = errors.New("entity not found") + ErrOptimisticLockingFailed = errors.New("optimistic locking failed") + ErrUserNotFoundInContext = errors.New("user not found in context") + ErrUnableToReadResourceJSON = errors.New("unable to read resource json") + ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported") + ErrLimitNotSupported = errors.New("limit not yet supported") + ErrNotImplementedYet = errors.New("not implemented yet") +) + +// ResourceServer implements all services +type ResourceServer interface { + ResourceStoreServer + // ResourceSearchServer + // DiagnosticsServer + + // Called once for initialization + Init() error + + // Stop + Stop() +} + +type AppendingStore interface { + // Write a Create/Update/Delete, + // NOTE: the contents of WriteEvent have been validated + // Return the revisionVersion for this event or error + WriteEvent(context.Context, *WriteEvent) (int64, error) + + // Read a value from storage + Read(context.Context, *ReadRequest) (*ReadResponse, error) + + // Implement List -- this expects the read after write semantics + List(context.Context, *ListRequest) (*ListResponse, error) + + // Watch for events + // TODO... this should be converted to a go style function + // that returns a channel (??) rather than the raw grpc server management + Watch(*WatchRequest, ResourceStore_WatchServer) error +} + +type ResourceServerOptions struct { + // OTel tracer + Tracer trace.Tracer + + // When running in a cluster, each node should have a different ID + // This is used for snowflake generation and log identification + NodeID int64 + + // Get the next EventID. When not set, this will default to snowflake IDs + NextEventID func() int64 + + // Real storage backend + Store AppendingStore + + // Real storage backend + Search ResourceSearchServer + + // Diagnostics + Diagnostics DiagnosticsServer + + // Check if a user has access to write folders + // When this is nil, no resources can have folders configured + WriteAccess WriteAccessHooks + + // Callbacks for startup and shutdown + Lifecycle LifecycleHooks +} + +func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { + if opts.Tracer == nil { + opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") + } + + if opts.NextEventID == nil { + eventNode, err := snowflake.NewNode(opts.NodeID) + if err != nil { + return nil, apierrors.NewInternalError( + fmt.Errorf("error initializing snowflake id generator :: %w", err)) + } + opts.NextEventID = func() int64 { + return eventNode.Generate().Int64() + } + } + + if opts.Store == nil { + return nil, fmt.Errorf("missing AppendingStore implementation") + } + if opts.Search == nil { + return nil, fmt.Errorf("missing ResourceSearchServer implementation") + } + if opts.Diagnostics == nil { + return nil, fmt.Errorf("missing Diagnostics implementation") + } + + return &server{ + tracer: opts.Tracer, + nextEventID: opts.NextEventID, + store: opts.Store, + search: opts.Search, + diagnostics: opts.Diagnostics, + access: opts.WriteAccess, + lifecycle: opts.Lifecycle, + }, nil +} + +var _ ResourceServer = &server{} + +type server struct { + tracer trace.Tracer + nextEventID func() int64 + store AppendingStore + search ResourceSearchServer + diagnostics DiagnosticsServer + access WriteAccessHooks + lifecycle LifecycleHooks + + // init checking + once sync.Once + initErr error +} + +// Init implements ResourceServer. +func (s *server) Init() error { + s.once.Do(func() { + // TODO, setup a broadcaster for watch + + // Call lifecycle hooks + if s.lifecycle != nil { + err := s.lifecycle.Init() + if err != nil { + s.initErr = fmt.Errorf("initialize Resource Server: %w", err) + } + } + }) + return s.initErr +} + +func (s *server) Stop() { + s.initErr = fmt.Errorf("service is stopping") + if s.lifecycle != nil { + s.lifecycle.Stop() + } + s.initErr = fmt.Errorf("service is stopped") +} + +func (s *server) newEvent(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*WriteEvent, error) { + var err error + event := &WriteEvent{ + EventID: s.nextEventID(), + Key: key, + Value: value, + } + event.Requester, err = identity.GetRequester(ctx) + if err != nil { + return nil, ErrUserNotFoundInContext + } + + dummy := &metav1.PartialObjectMetadata{} + err = json.Unmarshal(value, dummy) + if err != nil { + return nil, ErrUnableToReadResourceJSON + } + + obj, err := utils.MetaAccessor(dummy) + if err != nil { + return nil, apierrors.NewBadRequest("invalid object in json") + } + if obj.GetUID() == "" { + return nil, apierrors.NewBadRequest("the UID must be set") + } + if obj.GetGenerateName() != "" { + return nil, apierrors.NewBadRequest("can not save value with generate name") + } + gvk := obj.GetGroupVersionKind() + if gvk.Kind == "" { + return nil, apierrors.NewBadRequest("expecting resources with a kind in the body") + } + if gvk.Version == "" { + return nil, apierrors.NewBadRequest("expecting resources with an apiVersion") + } + if gvk.Group != "" && gvk.Group != key.Group { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group), + ) + } + if obj.GetName() != key.Name { + return nil, apierrors.NewBadRequest("key name does not match the name in the body") + } + if obj.GetNamespace() != key.Namespace { + return nil, apierrors.NewBadRequest("key namespace does not match the namespace in the body") + } + folder := obj.GetFolder() + if folder != "" { + err = s.access.CanWriteFolder(ctx, event.Requester, folder) + if err != nil { + return nil, err + } + } + origin, err := obj.GetOriginInfo() + if err != nil { + return nil, apierrors.NewBadRequest("invalid origin info") + } + if origin != nil { + err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name) + if err != nil { + return nil, err + } + } + event.Object = obj + + // This is an update + if oldValue != nil { + dummy := &metav1.PartialObjectMetadata{} + err = json.Unmarshal(oldValue, dummy) + if err != nil { + return nil, apierrors.NewBadRequest("error reading old json value") + } + old, err := utils.MetaAccessor(dummy) + if err != nil { + return nil, apierrors.NewBadRequest("invalid object inside old json") + } + if key.Name != old.GetName() { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("the old value has a different name (%s != %s)", key.Name, old.GetName())) + } + + // Can not change creation timestamps+user + if obj.GetCreatedBy() != old.GetCreatedBy() { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy())) + } + if obj.GetCreationTimestamp() != old.GetCreationTimestamp() { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp())) + } + + oldFolder := obj.GetFolder() + if oldFolder != folder { + event.FolderChanged = true + } + event.OldObject = old + } else if folder != "" { + event.FolderChanged = true + } + return event, nil +} + +func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) { + ctx, span := s.tracer.Start(ctx, "storage_server.Create") + defer span.End() + + if err := s.Init(); err != nil { + return nil, err + } + + if req.Key.ResourceVersion > 0 { + return nil, apierrors.NewBadRequest("can not update a specific resource version") + } + + event, err := s.newEvent(ctx, req.Key, req.Value, nil) + if err != nil { + return nil, err + } + event.Operation = ResourceOperation_CREATED + event.Blob = req.Blob + event.Message = req.Message + + rsp := &CreateResponse{} + // Make sure the created by user is accurate + //---------------------------------------- + val := event.Object.GetCreatedBy() + if val != "" && val != event.Requester.GetUID().String() { + return nil, apierrors.NewBadRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy) + } + + // Create can not have updated properties + //---------------------------------------- + if event.Object.GetUpdatedBy() != "" { + return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy) + } + + ts, err := event.Object.GetUpdatedTimestamp() + if err != nil { + return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid timestamp: %s", err)) + } + if ts != nil { + return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp) + } + + // Append and set the resource version + rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.Status, err = errToStatus(err) + return rsp, err +} + +// Convert golang errors to status result errors that can be returned to a client +func errToStatus(err error) (*StatusResult, error) { + if err != nil { + // TODO... better conversion!!! + return &StatusResult{ + Status: "Failure", + Message: err.Error(), + }, nil + } + return nil, err +} + +func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) { + ctx, span := s.tracer.Start(ctx, "storage_server.Update") + defer span.End() + + if err := s.Init(); err != nil { + return nil, err + } + + rsp := &UpdateResponse{} + if req.Key.ResourceVersion < 0 { + rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version")) + return rsp, nil + } + + latest, err := s.store.Read(ctx, &ReadRequest{ + Key: req.Key.WithoutResourceVersion(), + }) + if err != nil { + return nil, err + } + if latest.Value == nil { + return nil, apierrors.NewBadRequest("current value does not exist") + } + + event, err := s.newEvent(ctx, req.Key, req.Value, latest.Value) + if err != nil { + return nil, err + } + event.Operation = ResourceOperation_UPDATED + event.PreviousRV = latest.ResourceVersion + event.Message = req.Message + + // Make sure the update user is accurate + //---------------------------------------- + val := event.Object.GetUpdatedBy() + if val != "" && val != event.Requester.GetUID().String() { + return nil, apierrors.NewBadRequest("updated by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy) + } + + rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.Status, err = errToStatus(err) + return rsp, err +} + +func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { + ctx, span := s.tracer.Start(ctx, "storage_server.Delete") + defer span.End() + + if err := s.Init(); err != nil { + return nil, err + } + + rsp := &DeleteResponse{} + if req.Key.ResourceVersion < 0 { + return nil, apierrors.NewBadRequest("update must include the previous version") + } + + latest, err := s.store.Read(ctx, &ReadRequest{ + Key: req.Key.WithoutResourceVersion(), + }) + if err != nil { + return nil, err + } + if latest.ResourceVersion != req.Key.ResourceVersion { + return nil, ErrOptimisticLockingFailed + } + + now := metav1.NewTime(time.Now()) + event := &WriteEvent{ + EventID: s.nextEventID(), + Key: req.Key, + Operation: ResourceOperation_DELETED, + PreviousRV: latest.ResourceVersion, + } + event.Requester, err = identity.GetRequester(ctx) + if err != nil { + return nil, apierrors.NewBadRequest("unable to get user") + } + marker := &DeletedMarker{} + err = json.Unmarshal(latest.Value, marker) + if err != nil { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("unable to read previous object, %v", err)) + } + event.Object, err = utils.MetaAccessor(marker) + if err != nil { + return nil, err + } + event.Object.SetDeletionTimestamp(&now) + event.Object.SetUpdatedTimestamp(&now.Time) + event.Object.SetManagedFields(nil) + event.Object.SetFinalizers(nil) + event.Object.SetUpdatedBy(event.Requester.GetUID().String()) + marker.TypeMeta = metav1.TypeMeta{ + Kind: "DeletedMarker", + APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common? + } + marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV) + event.Value, err = json.Marshal(marker) + if err != nil { + return nil, apierrors.NewBadRequest( + fmt.Sprintf("unable creating deletion marker, %v", err)) + } + + rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.Status, err = errToStatus(err) + return rsp, err +} + +func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { + if err := s.Init(); err != nil { + return nil, err + } + + rsp, err := s.store.Read(ctx, req) + if err != nil { + if rsp == nil { + rsp = &ReadResponse{} + } + rsp.Status, err = errToStatus(err) + } + return rsp, err +} + +func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { + if err := s.Init(); err != nil { + return nil, err + } + + rsp, err := s.store.List(ctx, req) + // Status??? + return rsp, err +} + +func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { + if err := s.Init(); err != nil { + return err + } + return s.Watch(req, srv) +} diff --git a/pkg/storage/unified/resource/server_test.go b/pkg/storage/unified/resource/server_test.go new file mode 100644 index 00000000000..fac61211412 --- /dev/null +++ b/pkg/storage/unified/resource/server_test.go @@ -0,0 +1,116 @@ +package resource + +import ( + "context" + "embed" + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/hack-pad/hackpadfs" + hackos "github.com/hack-pad/hackpadfs/os" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/apimachinery/utils" +) + +func TestWriter(t *testing.T) { + testUserA := &identity.StaticRequester{ + Namespace: identity.NamespaceUser, + UserID: 123, + UserUID: "u123", + OrgRole: identity.RoleAdmin, + IsGrafanaAdmin: true, // can do anything + } + ctx := identity.WithRequester(context.Background(), testUserA) + + var root hackpadfs.FS + if false { + tmp, err := os.MkdirTemp("", "xxx-*") + require.NoError(t, err) + + root, err = hackos.NewFS().Sub(tmp[1:]) + require.NoError(t, err) + fmt.Printf("ROOT: %s\n\n", tmp) + } + tmp, err := NewFileSystemStore(FileSystemOptions{ + Root: root, + }) + require.NoError(t, err) + + server, err := NewResourceServer(ResourceServerOptions{ + Store: tmp, + }) + require.NoError(t, err) + + t.Run("playlist happy CRUD paths", func(t *testing.T) { + raw := testdata(t, "01_create_playlist.json") + key := &ResourceKey{ + Group: "playlist.grafana.app", + Resource: "rrrr", // can be anything :( + Namespace: "default", + Name: "fdgsv37qslr0ga", + } + created, err := server.Create(ctx, &CreateRequest{ + Value: raw, + Key: key, + }) + require.NoError(t, err) + require.True(t, created.ResourceVersion > 0) + + // The key does not include resource version + found, err := server.Read(ctx, &ReadRequest{Key: key}) + require.NoError(t, err) + require.Equal(t, created.ResourceVersion, found.ResourceVersion) + + // Now update the value + tmp := &unstructured.Unstructured{} + err = json.Unmarshal(raw, tmp) + require.NoError(t, err) + + now := time.Now().UnixMilli() + obj, err := utils.MetaAccessor(tmp) + require.NoError(t, err) + obj.SetAnnotation("test", "hello") + obj.SetUpdatedTimestampMillis(now) + obj.SetUpdatedBy(testUserA.GetUID().String()) + raw, err = json.Marshal(tmp) + require.NoError(t, err) + + key.ResourceVersion = created.ResourceVersion + updated, err := server.Update(ctx, &UpdateRequest{Key: key, Value: raw}) + require.NoError(t, err) + require.True(t, updated.ResourceVersion > created.ResourceVersion) + + // We should still get the latest + key.ResourceVersion = 0 + found, err = server.Read(ctx, &ReadRequest{Key: key}) + require.NoError(t, err) + require.Equal(t, updated.ResourceVersion, found.ResourceVersion) + + key.ResourceVersion = updated.ResourceVersion + deleted, err := server.Delete(ctx, &DeleteRequest{Key: key}) + require.NoError(t, err) + require.True(t, deleted.ResourceVersion > updated.ResourceVersion) + + // We should get not found when trying to read the latest value + key.ResourceVersion = 0 + found, err = server.Read(ctx, &ReadRequest{Key: key}) + require.Error(t, err) + require.Nil(t, found) + }) +} + +//go:embed testdata/* +var testdataFS embed.FS + +func testdata(t *testing.T, filename string) []byte { + t.Helper() + b, err := testdataFS.ReadFile(`testdata/` + filename) + require.NoError(t, err) + return b +}