mirror of https://github.com/grafana/grafana
parent
4a41f7d0dd
commit
f66768c67d
@ -0,0 +1,153 @@ |
|||||||
|
package resource |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"encoding/json" |
||||||
|
"fmt" |
||||||
|
"path/filepath" |
||||||
|
"sort" |
||||||
|
"strings" |
||||||
|
|
||||||
|
"github.com/hack-pad/hackpadfs" |
||||||
|
"github.com/hack-pad/hackpadfs/mem" |
||||||
|
"go.opentelemetry.io/otel/trace" |
||||||
|
"go.opentelemetry.io/otel/trace/noop" |
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema" |
||||||
|
) |
||||||
|
|
||||||
|
type FileSystemOptions struct { |
||||||
|
// OTel tracer
|
||||||
|
Tracer trace.Tracer |
||||||
|
|
||||||
|
// Root file system -- null will be in memory
|
||||||
|
Root hackpadfs.FS |
||||||
|
} |
||||||
|
|
||||||
|
func NewFileSystemStore(opts FileSystemOptions) (AppendingStore, error) { |
||||||
|
if opts.Tracer == nil { |
||||||
|
opts.Tracer = noop.NewTracerProvider().Tracer("fs") |
||||||
|
} |
||||||
|
|
||||||
|
var err error |
||||||
|
root := opts.Root |
||||||
|
if root == nil { |
||||||
|
root, err = mem.NewFS() |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
return &fsStore{tracer: opts.Tracer, root: root}, nil |
||||||
|
} |
||||||
|
|
||||||
|
type fsStore struct { |
||||||
|
tracer trace.Tracer |
||||||
|
root hackpadfs.FS |
||||||
|
} |
||||||
|
|
||||||
|
type fsEvent struct { |
||||||
|
ResourceVersion int64 `json:"resourceVersion"` |
||||||
|
Message string `json:"message,omitempty"` |
||||||
|
Operation string `json:"operation,omitempty"` |
||||||
|
Value json.RawMessage `json:"value,omitempty"` |
||||||
|
BlobPath string `json:"blob,omitempty"` |
||||||
|
} |
||||||
|
|
||||||
|
// The only write command
|
||||||
|
func (f *fsStore) WriteEvent(ctx context.Context, event *WriteEvent) (int64, error) { |
||||||
|
body := fsEvent{ |
||||||
|
ResourceVersion: event.EventID, |
||||||
|
Message: event.Message, |
||||||
|
Operation: event.Operation.String(), |
||||||
|
Value: event.Value, |
||||||
|
// Blob...
|
||||||
|
} |
||||||
|
// For this case, we will treat them the same
|
||||||
|
event.Key.ResourceVersion = 0 |
||||||
|
dir := event.Key.NamespacedPath() |
||||||
|
err := hackpadfs.MkdirAll(f.root, dir, 0750) |
||||||
|
if err != nil { |
||||||
|
return 0, err |
||||||
|
} |
||||||
|
|
||||||
|
bytes, err := json.Marshal(&body) |
||||||
|
if err != nil { |
||||||
|
return 0, err |
||||||
|
} |
||||||
|
|
||||||
|
fpath := filepath.Join(dir, fmt.Sprintf("%d.json", event.EventID)) |
||||||
|
file, err := hackpadfs.OpenFile(f.root, fpath, hackpadfs.FlagWriteOnly|hackpadfs.FlagCreate, 0750) |
||||||
|
if err != nil { |
||||||
|
return 0, err |
||||||
|
} |
||||||
|
_, err = hackpadfs.WriteFile(file, bytes) |
||||||
|
return event.EventID, err |
||||||
|
} |
||||||
|
|
||||||
|
// Read implements ResourceStoreServer.
|
||||||
|
func (f *fsStore) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { |
||||||
|
rv := req.Key.ResourceVersion |
||||||
|
req.Key.ResourceVersion = 0 |
||||||
|
|
||||||
|
fname := "--x--" |
||||||
|
dir := req.Key.NamespacedPath() |
||||||
|
if rv > 0 { |
||||||
|
fname = fmt.Sprintf("%d.json", rv) |
||||||
|
} else { |
||||||
|
files, err := hackpadfs.ReadDir(f.root, dir) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
// Sort by name
|
||||||
|
sort.Slice(files, func(i, j int) bool { |
||||||
|
a := files[i].Name() |
||||||
|
b := files[j].Name() |
||||||
|
return a > b // ?? should we parse the numbers ???
|
||||||
|
}) |
||||||
|
|
||||||
|
// The first matching file
|
||||||
|
for _, v := range files { |
||||||
|
fname = v.Name() |
||||||
|
if strings.HasSuffix(fname, ".json") { |
||||||
|
break |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
evt, err := f.open(filepath.Join(dir, fname)) |
||||||
|
if err != nil || evt.Operation == ResourceOperation_DELETED.String() { |
||||||
|
return nil, apierrors.NewNotFound(schema.GroupResource{ |
||||||
|
Group: req.Key.Group, |
||||||
|
Resource: req.Key.Resource, |
||||||
|
}, req.Key.Name) |
||||||
|
} |
||||||
|
|
||||||
|
return &ReadResponse{ |
||||||
|
ResourceVersion: evt.ResourceVersion, |
||||||
|
Value: evt.Value, |
||||||
|
Message: evt.Message, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (f *fsStore) open(p string) (*fsEvent, error) { |
||||||
|
raw, err := hackpadfs.ReadFile(f.root, p) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
evt := &fsEvent{} |
||||||
|
err = json.Unmarshal(raw, evt) |
||||||
|
return evt, err |
||||||
|
} |
||||||
|
|
||||||
|
// List implements AppendingStore.
|
||||||
|
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { |
||||||
|
panic("unimplemented") |
||||||
|
} |
||||||
|
|
||||||
|
// Watch implements AppendingStore.
|
||||||
|
func (f *fsStore) Watch(*WatchRequest, ResourceStore_WatchServer) error { |
||||||
|
panic("unimplemented") |
||||||
|
} |
@ -0,0 +1,45 @@ |
|||||||
|
package resource |
||||||
|
|
||||||
|
import ( |
||||||
|
context "context" |
||||||
|
"fmt" |
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||||
|
) |
||||||
|
|
||||||
|
type WriteAccessHooks struct { |
||||||
|
// Check if a user has access to write folders
|
||||||
|
// When this is nil, no resources can have folders configured
|
||||||
|
Folder func(ctx context.Context, user identity.Requester, uid string) bool |
||||||
|
|
||||||
|
// When configured, this will make sure a user is allowed to save to a given origin
|
||||||
|
Origin func(ctx context.Context, user identity.Requester, origin string) bool |
||||||
|
} |
||||||
|
|
||||||
|
type LifecycleHooks interface { |
||||||
|
// Called once at initialization
|
||||||
|
Init() error |
||||||
|
|
||||||
|
// Stop function -- after calling this, any additional storage functions may error
|
||||||
|
Stop() |
||||||
|
} |
||||||
|
|
||||||
|
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error { |
||||||
|
if a.Folder == nil { |
||||||
|
return fmt.Errorf("writing folders is not supported") |
||||||
|
} |
||||||
|
if !a.Folder(ctx, user, uid) { |
||||||
|
return fmt.Errorf("not allowed to write resource to folder") |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error { |
||||||
|
if a.Origin == nil || uid == "UI" { |
||||||
|
return nil // default to OK
|
||||||
|
} |
||||||
|
if !a.Origin(ctx, user, uid) { |
||||||
|
return fmt.Errorf("not allowed to write resource at origin") |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
@ -0,0 +1,469 @@ |
|||||||
|
package resource |
||||||
|
|
||||||
|
import ( |
||||||
|
context "context" |
||||||
|
"encoding/json" |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"sync" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/bwmarrin/snowflake" |
||||||
|
"go.opentelemetry.io/otel/trace" |
||||||
|
"go.opentelemetry.io/otel/trace/noop" |
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||||
|
) |
||||||
|
|
||||||
|
// Package-level errors.
|
||||||
|
var ( |
||||||
|
ErrNotFound = errors.New("entity not found") |
||||||
|
ErrOptimisticLockingFailed = errors.New("optimistic locking failed") |
||||||
|
ErrUserNotFoundInContext = errors.New("user not found in context") |
||||||
|
ErrUnableToReadResourceJSON = errors.New("unable to read resource json") |
||||||
|
ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported") |
||||||
|
ErrLimitNotSupported = errors.New("limit not yet supported") |
||||||
|
ErrNotImplementedYet = errors.New("not implemented yet") |
||||||
|
) |
||||||
|
|
||||||
|
// ResourceServer implements all services
|
||||||
|
type ResourceServer interface { |
||||||
|
ResourceStoreServer |
||||||
|
// ResourceSearchServer
|
||||||
|
// DiagnosticsServer
|
||||||
|
|
||||||
|
// Called once for initialization
|
||||||
|
Init() error |
||||||
|
|
||||||
|
// Stop
|
||||||
|
Stop() |
||||||
|
} |
||||||
|
|
||||||
|
type AppendingStore interface { |
||||||
|
// Write a Create/Update/Delete,
|
||||||
|
// NOTE: the contents of WriteEvent have been validated
|
||||||
|
// Return the revisionVersion for this event or error
|
||||||
|
WriteEvent(context.Context, *WriteEvent) (int64, error) |
||||||
|
|
||||||
|
// Read a value from storage
|
||||||
|
Read(context.Context, *ReadRequest) (*ReadResponse, error) |
||||||
|
|
||||||
|
// Implement List -- this expects the read after write semantics
|
||||||
|
List(context.Context, *ListRequest) (*ListResponse, error) |
||||||
|
|
||||||
|
// Watch for events
|
||||||
|
// TODO... this should be converted to a go style function
|
||||||
|
// that returns a channel (??) rather than the raw grpc server management
|
||||||
|
Watch(*WatchRequest, ResourceStore_WatchServer) error |
||||||
|
} |
||||||
|
|
||||||
|
type ResourceServerOptions struct { |
||||||
|
// OTel tracer
|
||||||
|
Tracer trace.Tracer |
||||||
|
|
||||||
|
// When running in a cluster, each node should have a different ID
|
||||||
|
// This is used for snowflake generation and log identification
|
||||||
|
NodeID int64 |
||||||
|
|
||||||
|
// Get the next EventID. When not set, this will default to snowflake IDs
|
||||||
|
NextEventID func() int64 |
||||||
|
|
||||||
|
// Real storage backend
|
||||||
|
Store AppendingStore |
||||||
|
|
||||||
|
// Real storage backend
|
||||||
|
Search ResourceSearchServer |
||||||
|
|
||||||
|
// Diagnostics
|
||||||
|
Diagnostics DiagnosticsServer |
||||||
|
|
||||||
|
// Check if a user has access to write folders
|
||||||
|
// When this is nil, no resources can have folders configured
|
||||||
|
WriteAccess WriteAccessHooks |
||||||
|
|
||||||
|
// Callbacks for startup and shutdown
|
||||||
|
Lifecycle LifecycleHooks |
||||||
|
} |
||||||
|
|
||||||
|
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { |
||||||
|
if opts.Tracer == nil { |
||||||
|
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") |
||||||
|
} |
||||||
|
|
||||||
|
if opts.NextEventID == nil { |
||||||
|
eventNode, err := snowflake.NewNode(opts.NodeID) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewInternalError( |
||||||
|
fmt.Errorf("error initializing snowflake id generator :: %w", err)) |
||||||
|
} |
||||||
|
opts.NextEventID = func() int64 { |
||||||
|
return eventNode.Generate().Int64() |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if opts.Store == nil { |
||||||
|
return nil, fmt.Errorf("missing AppendingStore implementation") |
||||||
|
} |
||||||
|
if opts.Search == nil { |
||||||
|
return nil, fmt.Errorf("missing ResourceSearchServer implementation") |
||||||
|
} |
||||||
|
if opts.Diagnostics == nil { |
||||||
|
return nil, fmt.Errorf("missing Diagnostics implementation") |
||||||
|
} |
||||||
|
|
||||||
|
return &server{ |
||||||
|
tracer: opts.Tracer, |
||||||
|
nextEventID: opts.NextEventID, |
||||||
|
store: opts.Store, |
||||||
|
search: opts.Search, |
||||||
|
diagnostics: opts.Diagnostics, |
||||||
|
access: opts.WriteAccess, |
||||||
|
lifecycle: opts.Lifecycle, |
||||||
|
}, nil |
||||||
|
} |
||||||
|
|
||||||
|
var _ ResourceServer = &server{} |
||||||
|
|
||||||
|
type server struct { |
||||||
|
tracer trace.Tracer |
||||||
|
nextEventID func() int64 |
||||||
|
store AppendingStore |
||||||
|
search ResourceSearchServer |
||||||
|
diagnostics DiagnosticsServer |
||||||
|
access WriteAccessHooks |
||||||
|
lifecycle LifecycleHooks |
||||||
|
|
||||||
|
// init checking
|
||||||
|
once sync.Once |
||||||
|
initErr error |
||||||
|
} |
||||||
|
|
||||||
|
// Init implements ResourceServer.
|
||||||
|
func (s *server) Init() error { |
||||||
|
s.once.Do(func() { |
||||||
|
// TODO, setup a broadcaster for watch
|
||||||
|
|
||||||
|
// Call lifecycle hooks
|
||||||
|
if s.lifecycle != nil { |
||||||
|
err := s.lifecycle.Init() |
||||||
|
if err != nil { |
||||||
|
s.initErr = fmt.Errorf("initialize Resource Server: %w", err) |
||||||
|
} |
||||||
|
} |
||||||
|
}) |
||||||
|
return s.initErr |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Stop() { |
||||||
|
s.initErr = fmt.Errorf("service is stopping") |
||||||
|
if s.lifecycle != nil { |
||||||
|
s.lifecycle.Stop() |
||||||
|
} |
||||||
|
s.initErr = fmt.Errorf("service is stopped") |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) newEvent(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*WriteEvent, error) { |
||||||
|
var err error |
||||||
|
event := &WriteEvent{ |
||||||
|
EventID: s.nextEventID(), |
||||||
|
Key: key, |
||||||
|
Value: value, |
||||||
|
} |
||||||
|
event.Requester, err = identity.GetRequester(ctx) |
||||||
|
if err != nil { |
||||||
|
return nil, ErrUserNotFoundInContext |
||||||
|
} |
||||||
|
|
||||||
|
dummy := &metav1.PartialObjectMetadata{} |
||||||
|
err = json.Unmarshal(value, dummy) |
||||||
|
if err != nil { |
||||||
|
return nil, ErrUnableToReadResourceJSON |
||||||
|
} |
||||||
|
|
||||||
|
obj, err := utils.MetaAccessor(dummy) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest("invalid object in json") |
||||||
|
} |
||||||
|
if obj.GetUID() == "" { |
||||||
|
return nil, apierrors.NewBadRequest("the UID must be set") |
||||||
|
} |
||||||
|
if obj.GetGenerateName() != "" { |
||||||
|
return nil, apierrors.NewBadRequest("can not save value with generate name") |
||||||
|
} |
||||||
|
gvk := obj.GetGroupVersionKind() |
||||||
|
if gvk.Kind == "" { |
||||||
|
return nil, apierrors.NewBadRequest("expecting resources with a kind in the body") |
||||||
|
} |
||||||
|
if gvk.Version == "" { |
||||||
|
return nil, apierrors.NewBadRequest("expecting resources with an apiVersion") |
||||||
|
} |
||||||
|
if gvk.Group != "" && gvk.Group != key.Group { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group), |
||||||
|
) |
||||||
|
} |
||||||
|
if obj.GetName() != key.Name { |
||||||
|
return nil, apierrors.NewBadRequest("key name does not match the name in the body") |
||||||
|
} |
||||||
|
if obj.GetNamespace() != key.Namespace { |
||||||
|
return nil, apierrors.NewBadRequest("key namespace does not match the namespace in the body") |
||||||
|
} |
||||||
|
folder := obj.GetFolder() |
||||||
|
if folder != "" { |
||||||
|
err = s.access.CanWriteFolder(ctx, event.Requester, folder) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
origin, err := obj.GetOriginInfo() |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest("invalid origin info") |
||||||
|
} |
||||||
|
if origin != nil { |
||||||
|
err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
event.Object = obj |
||||||
|
|
||||||
|
// This is an update
|
||||||
|
if oldValue != nil { |
||||||
|
dummy := &metav1.PartialObjectMetadata{} |
||||||
|
err = json.Unmarshal(oldValue, dummy) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest("error reading old json value") |
||||||
|
} |
||||||
|
old, err := utils.MetaAccessor(dummy) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest("invalid object inside old json") |
||||||
|
} |
||||||
|
if key.Name != old.GetName() { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("the old value has a different name (%s != %s)", key.Name, old.GetName())) |
||||||
|
} |
||||||
|
|
||||||
|
// Can not change creation timestamps+user
|
||||||
|
if obj.GetCreatedBy() != old.GetCreatedBy() { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy())) |
||||||
|
} |
||||||
|
if obj.GetCreationTimestamp() != old.GetCreationTimestamp() { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp())) |
||||||
|
} |
||||||
|
|
||||||
|
oldFolder := obj.GetFolder() |
||||||
|
if oldFolder != folder { |
||||||
|
event.FolderChanged = true |
||||||
|
} |
||||||
|
event.OldObject = old |
||||||
|
} else if folder != "" { |
||||||
|
event.FolderChanged = true |
||||||
|
} |
||||||
|
return event, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) { |
||||||
|
ctx, span := s.tracer.Start(ctx, "storage_server.Create") |
||||||
|
defer span.End() |
||||||
|
|
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
if req.Key.ResourceVersion > 0 { |
||||||
|
return nil, apierrors.NewBadRequest("can not update a specific resource version") |
||||||
|
} |
||||||
|
|
||||||
|
event, err := s.newEvent(ctx, req.Key, req.Value, nil) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
event.Operation = ResourceOperation_CREATED |
||||||
|
event.Blob = req.Blob |
||||||
|
event.Message = req.Message |
||||||
|
|
||||||
|
rsp := &CreateResponse{} |
||||||
|
// Make sure the created by user is accurate
|
||||||
|
//----------------------------------------
|
||||||
|
val := event.Object.GetCreatedBy() |
||||||
|
if val != "" && val != event.Requester.GetUID().String() { |
||||||
|
return nil, apierrors.NewBadRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy) |
||||||
|
} |
||||||
|
|
||||||
|
// Create can not have updated properties
|
||||||
|
//----------------------------------------
|
||||||
|
if event.Object.GetUpdatedBy() != "" { |
||||||
|
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy) |
||||||
|
} |
||||||
|
|
||||||
|
ts, err := event.Object.GetUpdatedTimestamp() |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid timestamp: %s", err)) |
||||||
|
} |
||||||
|
if ts != nil { |
||||||
|
return nil, apierrors.NewBadRequest("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp) |
||||||
|
} |
||||||
|
|
||||||
|
// Append and set the resource version
|
||||||
|
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) |
||||||
|
rsp.Status, err = errToStatus(err) |
||||||
|
return rsp, err |
||||||
|
} |
||||||
|
|
||||||
|
// Convert golang errors to status result errors that can be returned to a client
|
||||||
|
func errToStatus(err error) (*StatusResult, error) { |
||||||
|
if err != nil { |
||||||
|
// TODO... better conversion!!!
|
||||||
|
return &StatusResult{ |
||||||
|
Status: "Failure", |
||||||
|
Message: err.Error(), |
||||||
|
}, nil |
||||||
|
} |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) { |
||||||
|
ctx, span := s.tracer.Start(ctx, "storage_server.Update") |
||||||
|
defer span.End() |
||||||
|
|
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
rsp := &UpdateResponse{} |
||||||
|
if req.Key.ResourceVersion < 0 { |
||||||
|
rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version")) |
||||||
|
return rsp, nil |
||||||
|
} |
||||||
|
|
||||||
|
latest, err := s.store.Read(ctx, &ReadRequest{ |
||||||
|
Key: req.Key.WithoutResourceVersion(), |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
if latest.Value == nil { |
||||||
|
return nil, apierrors.NewBadRequest("current value does not exist") |
||||||
|
} |
||||||
|
|
||||||
|
event, err := s.newEvent(ctx, req.Key, req.Value, latest.Value) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
event.Operation = ResourceOperation_UPDATED |
||||||
|
event.PreviousRV = latest.ResourceVersion |
||||||
|
event.Message = req.Message |
||||||
|
|
||||||
|
// Make sure the update user is accurate
|
||||||
|
//----------------------------------------
|
||||||
|
val := event.Object.GetUpdatedBy() |
||||||
|
if val != "" && val != event.Requester.GetUID().String() { |
||||||
|
return nil, apierrors.NewBadRequest("updated by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy) |
||||||
|
} |
||||||
|
|
||||||
|
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) |
||||||
|
rsp.Status, err = errToStatus(err) |
||||||
|
return rsp, err |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { |
||||||
|
ctx, span := s.tracer.Start(ctx, "storage_server.Delete") |
||||||
|
defer span.End() |
||||||
|
|
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
rsp := &DeleteResponse{} |
||||||
|
if req.Key.ResourceVersion < 0 { |
||||||
|
return nil, apierrors.NewBadRequest("update must include the previous version") |
||||||
|
} |
||||||
|
|
||||||
|
latest, err := s.store.Read(ctx, &ReadRequest{ |
||||||
|
Key: req.Key.WithoutResourceVersion(), |
||||||
|
}) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
if latest.ResourceVersion != req.Key.ResourceVersion { |
||||||
|
return nil, ErrOptimisticLockingFailed |
||||||
|
} |
||||||
|
|
||||||
|
now := metav1.NewTime(time.Now()) |
||||||
|
event := &WriteEvent{ |
||||||
|
EventID: s.nextEventID(), |
||||||
|
Key: req.Key, |
||||||
|
Operation: ResourceOperation_DELETED, |
||||||
|
PreviousRV: latest.ResourceVersion, |
||||||
|
} |
||||||
|
event.Requester, err = identity.GetRequester(ctx) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest("unable to get user") |
||||||
|
} |
||||||
|
marker := &DeletedMarker{} |
||||||
|
err = json.Unmarshal(latest.Value, marker) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("unable to read previous object, %v", err)) |
||||||
|
} |
||||||
|
event.Object, err = utils.MetaAccessor(marker) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
event.Object.SetDeletionTimestamp(&now) |
||||||
|
event.Object.SetUpdatedTimestamp(&now.Time) |
||||||
|
event.Object.SetManagedFields(nil) |
||||||
|
event.Object.SetFinalizers(nil) |
||||||
|
event.Object.SetUpdatedBy(event.Requester.GetUID().String()) |
||||||
|
marker.TypeMeta = metav1.TypeMeta{ |
||||||
|
Kind: "DeletedMarker", |
||||||
|
APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
|
||||||
|
} |
||||||
|
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV) |
||||||
|
event.Value, err = json.Marshal(marker) |
||||||
|
if err != nil { |
||||||
|
return nil, apierrors.NewBadRequest( |
||||||
|
fmt.Sprintf("unable creating deletion marker, %v", err)) |
||||||
|
} |
||||||
|
|
||||||
|
rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) |
||||||
|
rsp.Status, err = errToStatus(err) |
||||||
|
return rsp, err |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { |
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
rsp, err := s.store.Read(ctx, req) |
||||||
|
if err != nil { |
||||||
|
if rsp == nil { |
||||||
|
rsp = &ReadResponse{} |
||||||
|
} |
||||||
|
rsp.Status, err = errToStatus(err) |
||||||
|
} |
||||||
|
return rsp, err |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { |
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
|
||||||
|
rsp, err := s.store.List(ctx, req) |
||||||
|
// Status???
|
||||||
|
return rsp, err |
||||||
|
} |
||||||
|
|
||||||
|
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { |
||||||
|
if err := s.Init(); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
return s.Watch(req, srv) |
||||||
|
} |
@ -0,0 +1,116 @@ |
|||||||
|
package resource |
||||||
|
|
||||||
|
import ( |
||||||
|
"context" |
||||||
|
"embed" |
||||||
|
"encoding/json" |
||||||
|
"fmt" |
||||||
|
"os" |
||||||
|
"testing" |
||||||
|
"time" |
||||||
|
|
||||||
|
"github.com/hack-pad/hackpadfs" |
||||||
|
hackos "github.com/hack-pad/hackpadfs/os" |
||||||
|
"github.com/stretchr/testify/require" |
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||||
|
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||||
|
) |
||||||
|
|
||||||
|
func TestWriter(t *testing.T) { |
||||||
|
testUserA := &identity.StaticRequester{ |
||||||
|
Namespace: identity.NamespaceUser, |
||||||
|
UserID: 123, |
||||||
|
UserUID: "u123", |
||||||
|
OrgRole: identity.RoleAdmin, |
||||||
|
IsGrafanaAdmin: true, // can do anything
|
||||||
|
} |
||||||
|
ctx := identity.WithRequester(context.Background(), testUserA) |
||||||
|
|
||||||
|
var root hackpadfs.FS |
||||||
|
if false { |
||||||
|
tmp, err := os.MkdirTemp("", "xxx-*") |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
root, err = hackos.NewFS().Sub(tmp[1:]) |
||||||
|
require.NoError(t, err) |
||||||
|
fmt.Printf("ROOT: %s\n\n", tmp) |
||||||
|
} |
||||||
|
tmp, err := NewFileSystemStore(FileSystemOptions{ |
||||||
|
Root: root, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
server, err := NewResourceServer(ResourceServerOptions{ |
||||||
|
Store: tmp, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
t.Run("playlist happy CRUD paths", func(t *testing.T) { |
||||||
|
raw := testdata(t, "01_create_playlist.json") |
||||||
|
key := &ResourceKey{ |
||||||
|
Group: "playlist.grafana.app", |
||||||
|
Resource: "rrrr", // can be anything :(
|
||||||
|
Namespace: "default", |
||||||
|
Name: "fdgsv37qslr0ga", |
||||||
|
} |
||||||
|
created, err := server.Create(ctx, &CreateRequest{ |
||||||
|
Value: raw, |
||||||
|
Key: key, |
||||||
|
}) |
||||||
|
require.NoError(t, err) |
||||||
|
require.True(t, created.ResourceVersion > 0) |
||||||
|
|
||||||
|
// The key does not include resource version
|
||||||
|
found, err := server.Read(ctx, &ReadRequest{Key: key}) |
||||||
|
require.NoError(t, err) |
||||||
|
require.Equal(t, created.ResourceVersion, found.ResourceVersion) |
||||||
|
|
||||||
|
// Now update the value
|
||||||
|
tmp := &unstructured.Unstructured{} |
||||||
|
err = json.Unmarshal(raw, tmp) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
now := time.Now().UnixMilli() |
||||||
|
obj, err := utils.MetaAccessor(tmp) |
||||||
|
require.NoError(t, err) |
||||||
|
obj.SetAnnotation("test", "hello") |
||||||
|
obj.SetUpdatedTimestampMillis(now) |
||||||
|
obj.SetUpdatedBy(testUserA.GetUID().String()) |
||||||
|
raw, err = json.Marshal(tmp) |
||||||
|
require.NoError(t, err) |
||||||
|
|
||||||
|
key.ResourceVersion = created.ResourceVersion |
||||||
|
updated, err := server.Update(ctx, &UpdateRequest{Key: key, Value: raw}) |
||||||
|
require.NoError(t, err) |
||||||
|
require.True(t, updated.ResourceVersion > created.ResourceVersion) |
||||||
|
|
||||||
|
// We should still get the latest
|
||||||
|
key.ResourceVersion = 0 |
||||||
|
found, err = server.Read(ctx, &ReadRequest{Key: key}) |
||||||
|
require.NoError(t, err) |
||||||
|
require.Equal(t, updated.ResourceVersion, found.ResourceVersion) |
||||||
|
|
||||||
|
key.ResourceVersion = updated.ResourceVersion |
||||||
|
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key}) |
||||||
|
require.NoError(t, err) |
||||||
|
require.True(t, deleted.ResourceVersion > updated.ResourceVersion) |
||||||
|
|
||||||
|
// We should get not found when trying to read the latest value
|
||||||
|
key.ResourceVersion = 0 |
||||||
|
found, err = server.Read(ctx, &ReadRequest{Key: key}) |
||||||
|
require.Error(t, err) |
||||||
|
require.Nil(t, found) |
||||||
|
}) |
||||||
|
} |
||||||
|
|
||||||
|
//go:embed testdata/*
|
||||||
|
var testdataFS embed.FS |
||||||
|
|
||||||
|
func testdata(t *testing.T, filename string) []byte { |
||||||
|
t.Helper() |
||||||
|
b, err := testdataFS.ReadFile(`testdata/` + filename) |
||||||
|
require.NoError(t, err) |
||||||
|
return b |
||||||
|
} |
Loading…
Reference in new issue