mirror of https://github.com/grafana/grafana
ResourceServer: Add resource server protobuf and wrapper (#90007)
parent
05ce16cf7b
commit
079f0715aa
@ -0,0 +1,5 @@ |
||||
// Package apistore provides a kubernetes store.Interface for a ResourceServer
|
||||
//
|
||||
// This package is responsible for running all the apiserver specific logic
|
||||
// before and after sending requests to the StorageServer
|
||||
package apistore |
@ -0,0 +1,87 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package apistore |
||||
|
||||
import ( |
||||
"path" |
||||
"time" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/registry/generic" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" |
||||
"k8s.io/client-go/tools/cache" |
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) |
||||
|
||||
type RESTOptionsGetter struct { |
||||
client resource.ResourceStoreClient |
||||
Codec runtime.Codec |
||||
} |
||||
|
||||
func NewRESTOptionsGetterForServer(server resource.ResourceServer, codec runtime.Codec) *RESTOptionsGetter { |
||||
return &RESTOptionsGetter{ |
||||
client: resource.NewLocalResourceStoreClient(server), |
||||
Codec: codec, |
||||
} |
||||
} |
||||
|
||||
func NewRESTOptionsGetter(client resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter { |
||||
return &RESTOptionsGetter{ |
||||
client: client, |
||||
Codec: codec, |
||||
} |
||||
} |
||||
|
||||
func (f *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { |
||||
storageConfig := &storagebackend.ConfigForResource{ |
||||
Config: storagebackend.Config{ |
||||
Type: "custom", |
||||
Prefix: "", |
||||
Transport: storagebackend.TransportConfig{ |
||||
ServerList: []string{ |
||||
// ??? string(connectionInfo),
|
||||
}, |
||||
}, |
||||
Codec: f.Codec, |
||||
EncodeVersioner: nil, |
||||
Transformer: nil, |
||||
CompactionInterval: 0, |
||||
CountMetricPollPeriod: 0, |
||||
DBMetricPollInterval: 0, |
||||
HealthcheckTimeout: 0, |
||||
ReadycheckTimeout: 0, |
||||
StorageObjectCountTracker: nil, |
||||
}, |
||||
GroupResource: resource, |
||||
} |
||||
|
||||
ret := generic.RESTOptions{ |
||||
StorageConfig: storageConfig, |
||||
Decorator: func( |
||||
config *storagebackend.ConfigForResource, |
||||
resourcePrefix string, |
||||
keyFunc func(obj runtime.Object) (string, error), |
||||
newFunc func() runtime.Object, |
||||
newListFunc func() runtime.Object, |
||||
getAttrsFunc storage.AttrFunc, |
||||
trigger storage.IndexerFuncs, |
||||
indexers *cache.Indexers, |
||||
) (storage.Interface, factory.DestroyFunc, error) { |
||||
return NewStorage(config, resource, f.client, f.Codec, keyFunc, newFunc, newListFunc, getAttrsFunc) |
||||
}, |
||||
DeleteCollectionWorkers: 0, |
||||
EnableGarbageCollection: false, |
||||
ResourcePrefix: path.Join(storageConfig.Prefix, resource.Group, resource.Resource), |
||||
CountMetricPollPeriod: 1 * time.Second, |
||||
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), |
||||
} |
||||
|
||||
return ret, nil |
||||
} |
@ -0,0 +1,531 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package apistore |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"reflect" |
||||
"strconv" |
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
"k8s.io/apimachinery/pkg/api/meta" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/conversion" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
||||
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
var _ storage.Interface = (*Storage)(nil) |
||||
|
||||
// Storage implements storage.Interface and stores resources in unified storage
|
||||
type Storage struct { |
||||
config *storagebackend.ConfigForResource |
||||
store resource.ResourceStoreClient |
||||
gr schema.GroupResource |
||||
codec runtime.Codec |
||||
keyFunc func(obj runtime.Object) (string, error) |
||||
newFunc func() runtime.Object |
||||
newListFunc func() runtime.Object |
||||
getAttrsFunc storage.AttrFunc |
||||
// trigger storage.IndexerFuncs
|
||||
// indexers *cache.Indexers
|
||||
} |
||||
|
||||
func NewStorage( |
||||
config *storagebackend.ConfigForResource, |
||||
gr schema.GroupResource, |
||||
store resource.ResourceStoreClient, |
||||
codec runtime.Codec, |
||||
keyFunc func(obj runtime.Object) (string, error), |
||||
newFunc func() runtime.Object, |
||||
newListFunc func() runtime.Object, |
||||
getAttrsFunc storage.AttrFunc, |
||||
) (storage.Interface, factory.DestroyFunc, error) { |
||||
return &Storage{ |
||||
config: config, |
||||
gr: gr, |
||||
codec: codec, |
||||
store: store, |
||||
keyFunc: keyFunc, |
||||
newFunc: newFunc, |
||||
newListFunc: newListFunc, |
||||
getAttrsFunc: getAttrsFunc, |
||||
}, nil, nil |
||||
} |
||||
|
||||
func errorWrap(status *resource.StatusResult) error { |
||||
if status != nil { |
||||
return &apierrors.StatusError{ErrStatus: metav1.Status{ |
||||
Status: metav1.StatusFailure, |
||||
Code: status.Code, |
||||
Reason: metav1.StatusReason(status.Reason), |
||||
Message: status.Message, |
||||
}} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func getKey(val string) (*resource.ResourceKey, error) { |
||||
k, err := grafanaregistry.ParseKey(val) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
// if k.Group == "" {
|
||||
// return nil, apierrors.NewInternalError(fmt.Errorf("missing group in request"))
|
||||
// }
|
||||
if k.Resource == "" { |
||||
return nil, apierrors.NewInternalError(fmt.Errorf("missing resource in request")) |
||||
} |
||||
return &resource.ResourceKey{ |
||||
Namespace: k.Namespace, |
||||
Group: k.Group, |
||||
Resource: k.Resource, |
||||
Name: k.Name, |
||||
}, err |
||||
} |
||||
|
||||
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
||||
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
||||
// set to the read value from database.
|
||||
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error { |
||||
k, err := getKey(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = s.Versioner().PrepareObjectForStorage(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
var buf bytes.Buffer |
||||
err = s.codec.Encode(obj, &buf) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
cmd := &resource.CreateRequest{ |
||||
Key: k, |
||||
Value: buf.Bytes(), |
||||
} |
||||
|
||||
// TODO?? blob from context?
|
||||
|
||||
rsp, err := s.store.Create(ctx, cmd) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = errorWrap(rsp.Status) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if rsp.Status != nil { |
||||
return fmt.Errorf("error in status %+v", rsp.Status) |
||||
} |
||||
|
||||
// Create into the out value
|
||||
_, _, err = s.codec.Decode(rsp.Value, nil, out) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
after, err := utils.MetaAccessor(out) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
after.SetResourceVersionInt64(rsp.ResourceVersion) |
||||
return nil |
||||
} |
||||
|
||||
// Delete removes the specified key and returns the value that existed at that spot.
|
||||
// If key didn't exist, it will return NotFound storage error.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { |
||||
k, err := getKey(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// if validateDeletion != nil {
|
||||
// return fmt.Errorf("not supported (validate deletion)")
|
||||
// }
|
||||
|
||||
cmd := &resource.DeleteRequest{Key: k} |
||||
if preconditions != nil { |
||||
if preconditions.ResourceVersion != nil { |
||||
cmd.ResourceVersion, err = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
if preconditions.UID != nil { |
||||
cmd.Uid = string(*preconditions.UID) |
||||
} |
||||
} |
||||
|
||||
rsp, err := s.store.Delete(ctx, cmd) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = errorWrap(rsp.Status) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||
// and any items selected by 'p' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
// If resource version is "0", this interface will get current object at given key
|
||||
// and send it in an "ADDED" event, before watch starts.
|
||||
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { |
||||
listopts, _, err := toListRequest(key, opts) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if listopts == nil { |
||||
return watch.NewEmptyWatch(), nil |
||||
} |
||||
|
||||
cmd := &resource.WatchRequest{ |
||||
Since: listopts.ResourceVersion, |
||||
Options: listopts.Options, |
||||
SendInitialEvents: false, |
||||
AllowWatchBookmarks: opts.Predicate.AllowWatchBookmarks, |
||||
} |
||||
if opts.SendInitialEvents != nil { |
||||
cmd.SendInitialEvents = *opts.SendInitialEvents |
||||
} |
||||
|
||||
client, err := s.store.Watch(ctx, cmd) |
||||
if err != nil { |
||||
// if the context was canceled, just return a new empty watch
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) { |
||||
return watch.NewEmptyWatch(), nil |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
reporter := apierrors.NewClientErrorReporter(500, "WATCH", "") |
||||
decoder := &streamDecoder{ |
||||
client: client, |
||||
newFunc: s.newFunc, |
||||
opts: opts, |
||||
codec: s.codec, |
||||
} |
||||
|
||||
return watch.NewStreamWatcher(decoder, reporter), nil |
||||
} |
||||
|
||||
// Get decodes object found at key into objPtr. On a not found error, will either
|
||||
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
|
||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { |
||||
var err error |
||||
req := &resource.ReadRequest{} |
||||
req.Key, err = getKey(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if opts.ResourceVersion != "" { |
||||
req.ResourceVersion, err = strconv.ParseInt(opts.ResourceVersion, 10, 64) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
rsp, err := s.store.Read(ctx, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = errorWrap(rsp.Status) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
_, _, err = s.codec.Decode(rsp.Value, &schema.GroupVersionKind{}, objPtr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
obj, err := utils.MetaAccessor(objPtr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
obj.SetResourceVersionInt64(rsp.ResourceVersion) |
||||
return nil |
||||
} |
||||
|
||||
func toListRequest(key string, opts storage.ListOptions) (*resource.ListRequest, storage.SelectionPredicate, error) { |
||||
predicate := opts.Predicate |
||||
k, err := getKey(key) |
||||
if err != nil { |
||||
return nil, predicate, err |
||||
} |
||||
req := &resource.ListRequest{ |
||||
Limit: opts.Predicate.Limit, |
||||
Options: &resource.ListOptions{ |
||||
Key: k, |
||||
}, |
||||
NextPageToken: predicate.Continue, |
||||
} |
||||
|
||||
if opts.Predicate.Label != nil && !opts.Predicate.Label.Empty() { |
||||
requirements, selectable := opts.Predicate.Label.Requirements() |
||||
if !selectable { |
||||
return nil, predicate, nil // not selectable
|
||||
} |
||||
|
||||
for _, r := range requirements { |
||||
v := r.Key() |
||||
|
||||
req.Options.Labels = append(req.Options.Labels, &resource.Requirement{ |
||||
Key: v, |
||||
Operator: string(r.Operator()), |
||||
Values: r.Values().List(), |
||||
}) |
||||
} |
||||
} |
||||
|
||||
if opts.Predicate.Field != nil && !opts.Predicate.Field.Empty() { |
||||
requirements := opts.Predicate.Field.Requirements() |
||||
for _, r := range requirements { |
||||
requirement := &resource.Requirement{Key: r.Field, Operator: string(r.Operator)} |
||||
if r.Value != "" { |
||||
requirement.Values = append(requirement.Values, r.Value) |
||||
} |
||||
req.Options.Labels = append(req.Options.Labels, requirement) |
||||
} |
||||
} |
||||
|
||||
if opts.ResourceVersion != "" { |
||||
rv, err := strconv.ParseInt(opts.ResourceVersion, 10, 64) |
||||
if err != nil { |
||||
return nil, predicate, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %s", opts.ResourceVersion)) |
||||
} |
||||
req.ResourceVersion = rv |
||||
} |
||||
|
||||
switch opts.ResourceVersionMatch { |
||||
case "", metav1.ResourceVersionMatchNotOlderThan: |
||||
req.VersionMatch = resource.ResourceVersionMatch_NotOlderThan |
||||
case metav1.ResourceVersionMatchExact: |
||||
req.VersionMatch = resource.ResourceVersionMatch_Exact |
||||
default: |
||||
return nil, predicate, apierrors.NewBadRequest( |
||||
fmt.Sprintf("unsupported version match: %v", opts.ResourceVersionMatch), |
||||
) |
||||
} |
||||
|
||||
return req, predicate, nil |
||||
} |
||||
|
||||
// GetList unmarshalls objects found at key into a *List api object (an object
|
||||
// that satisfies runtime.IsList definition).
|
||||
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
|
||||
// is true, 'key' is used as a prefix.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { |
||||
req, predicate, err := toListRequest(key, opts) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
rsp, err := s.store.List(ctx, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
listPtr, err := meta.GetItemsPtr(listObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
v, err := conversion.EnforcePtr(listPtr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, item := range rsp.Items { |
||||
tmp := s.newFunc() |
||||
|
||||
tmp, _, err = s.codec.Decode(item.Value, nil, tmp) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
obj, err := utils.MetaAccessor(tmp) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
obj.SetResourceVersionInt64(item.ResourceVersion) |
||||
|
||||
// apply any predicates not handled in storage
|
||||
matches, err := predicate.Matches(tmp) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
if !matches { |
||||
continue |
||||
} |
||||
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(tmp).Elem())) |
||||
} |
||||
|
||||
listAccessor, err := meta.ListAccessor(listObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if rsp.NextPageToken != "" { |
||||
listAccessor.SetContinue(rsp.NextPageToken) |
||||
} |
||||
if rsp.RemainingItemCount > 0 { |
||||
listAccessor.SetRemainingItemCount(&rsp.RemainingItemCount) |
||||
} |
||||
if rsp.ResourceVersion > 0 { |
||||
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10)) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
|
||||
// retrying the update until success if there is index conflict.
|
||||
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
|
||||
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
|
||||
// the current contents of the object when deciding how the update object should look.
|
||||
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
|
||||
// else `destination` will be set to the zero value of it's type.
|
||||
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
|
||||
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
|
||||
// contents.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) GuaranteedUpdate( |
||||
ctx context.Context, |
||||
key string, |
||||
destination runtime.Object, |
||||
ignoreNotFound bool, |
||||
preconditions *storage.Preconditions, |
||||
tryUpdate storage.UpdateFunc, |
||||
cachedExistingObject runtime.Object, |
||||
) error { |
||||
k, err := getKey(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Get the current version
|
||||
err = s.Get(ctx, key, storage.GetOptions{}, destination) |
||||
if err != nil { |
||||
if ignoreNotFound && apierrors.IsNotFound(err) { |
||||
// destination is already set to zero value
|
||||
// we'll create the resource
|
||||
} else { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
accessor, err := utils.MetaAccessor(destination) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Early optimistic locking failure
|
||||
previousVersion, _ := strconv.ParseInt(accessor.GetResourceVersion(), 10, 64) |
||||
if preconditions != nil { |
||||
if preconditions.ResourceVersion != nil { |
||||
rv, err := strconv.ParseInt(*preconditions.ResourceVersion, 10, 64) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if rv != previousVersion { |
||||
return fmt.Errorf("optimistic locking mismatch (previousVersion mismatch)") |
||||
} |
||||
} |
||||
|
||||
if preconditions.UID != nil { |
||||
if accessor.GetUID() != *preconditions.UID { |
||||
return fmt.Errorf("optimistic locking mismatch (UID mismatch)") |
||||
} |
||||
} |
||||
} |
||||
|
||||
res := &storage.ResponseMeta{} |
||||
updatedObj, _, err := tryUpdate(destination, *res) |
||||
if err != nil { |
||||
var statusErr *apierrors.StatusError |
||||
if errors.As(err, &statusErr) { |
||||
// For now, forbidden may come from a mutation handler
|
||||
if statusErr.ErrStatus.Reason == metav1.StatusReasonForbidden { |
||||
return statusErr |
||||
} |
||||
} |
||||
return apierrors.NewInternalError( |
||||
fmt.Errorf("could not successfully update object. key=%s, err=%s", k.String(), err.Error()), |
||||
) |
||||
} |
||||
|
||||
var buf bytes.Buffer |
||||
err = s.codec.Encode(updatedObj, &buf) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
req := &resource.UpdateRequest{Key: k, Value: buf.Bytes()} |
||||
rsp, err := s.store.Update(ctx, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = errorWrap(rsp.Status) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Read the mutated fields the response field
|
||||
_, _, err = s.codec.Decode(rsp.Value, nil, destination) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
accessor, err = utils.MetaAccessor(destination) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
accessor.SetResourceVersionInt64(rsp.ResourceVersion) |
||||
return nil |
||||
} |
||||
|
||||
// Count returns number of different entries under the key (generally being path prefix).
|
||||
func (s *Storage) Count(key string) (int64, error) { |
||||
return 0, nil |
||||
} |
||||
|
||||
func (s *Storage) Versioner() storage.Versioner { |
||||
return &storage.APIObjectVersioner{} |
||||
} |
||||
|
||||
func (s *Storage) RequestWatchProgress(ctx context.Context) error { |
||||
return nil |
||||
} |
@ -0,0 +1,203 @@ |
||||
package apistore |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
|
||||
grpcCodes "google.golang.org/grpc/codes" |
||||
grpcStatus "google.golang.org/grpc/status" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/klog/v2" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
type streamDecoder struct { |
||||
client resource.ResourceStore_WatchClient |
||||
newFunc func() runtime.Object |
||||
opts storage.ListOptions |
||||
codec runtime.Codec |
||||
} |
||||
|
||||
func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) { |
||||
obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc()) |
||||
if err == nil { |
||||
accessor, err := utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
accessor.SetResourceVersionInt64(w.Version) |
||||
} |
||||
return obj, err |
||||
} |
||||
|
||||
func (d *streamDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { |
||||
decode: |
||||
for { |
||||
err := d.client.Context().Err() |
||||
if err != nil { |
||||
klog.Errorf("client: context error: %s\n", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
evt, err := d.client.Recv() |
||||
if errors.Is(err, io.EOF) { |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
if grpcStatus.Code(err) == grpcCodes.Canceled { |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
if err != nil { |
||||
klog.Errorf("client: error receiving result: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
// Error event
|
||||
if evt.Type == resource.WatchEvent_ERROR { |
||||
err = fmt.Errorf("stream error") |
||||
klog.Errorf("client: error receiving result: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
if evt.Resource == nil { |
||||
klog.Errorf("client: received nil \n") |
||||
continue decode |
||||
} |
||||
|
||||
if evt.Type == resource.WatchEvent_BOOKMARK { |
||||
obj := d.newFunc() |
||||
|
||||
// here k8s expects an empty object with just resource version and k8s.io/initial-events-end annotation
|
||||
accessor, err := utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
klog.Errorf("error getting object accessor: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
accessor.SetResourceVersionInt64(evt.Resource.Version) |
||||
accessor.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"}) |
||||
return watch.Bookmark, obj, nil |
||||
} |
||||
|
||||
obj, err := d.toObject(evt.Resource) |
||||
if err != nil { |
||||
klog.Errorf("error decoding entity: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
var watchAction watch.EventType |
||||
switch evt.Type { |
||||
case resource.WatchEvent_ADDED: |
||||
// apply any predicates not handled in storage
|
||||
matches, err := d.opts.Predicate.Matches(obj) |
||||
if err != nil { |
||||
klog.Errorf("error matching object: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
if !matches { |
||||
continue decode |
||||
} |
||||
|
||||
watchAction = watch.Added |
||||
case resource.WatchEvent_MODIFIED: |
||||
watchAction = watch.Modified |
||||
|
||||
// apply any predicates not handled in storage
|
||||
matches, err := d.opts.Predicate.Matches(obj) |
||||
if err != nil { |
||||
klog.Errorf("error matching object: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
// if we have a previous object, check if it matches
|
||||
prevMatches := false |
||||
var prevObj runtime.Object |
||||
if evt.Previous != nil { |
||||
prevObj, err = d.toObject(evt.Previous) |
||||
if err != nil { |
||||
klog.Errorf("error decoding entity: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
// apply any predicates not handled in storage
|
||||
prevMatches, err = d.opts.Predicate.Matches(prevObj) |
||||
if err != nil { |
||||
klog.Errorf("error matching object: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
} |
||||
|
||||
if !matches { |
||||
if !prevMatches { |
||||
continue decode |
||||
} |
||||
|
||||
// if the object didn't match, send a Deleted event
|
||||
watchAction = watch.Deleted |
||||
|
||||
// here k8s expects the previous object but with the new resource version
|
||||
obj = prevObj |
||||
|
||||
accessor, err := utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
klog.Errorf("error getting object accessor: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
accessor.SetResourceVersionInt64(evt.Resource.Version) |
||||
} else if !prevMatches { |
||||
// if the object didn't previously match, send an Added event
|
||||
watchAction = watch.Added |
||||
} |
||||
case resource.WatchEvent_DELETED: |
||||
watchAction = watch.Deleted |
||||
|
||||
// if we have a previous object, return that in the deleted event
|
||||
if evt.Previous != nil { |
||||
obj, err = d.toObject(evt.Previous) |
||||
if err != nil { |
||||
klog.Errorf("error decoding entity: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
// here k8s expects the previous object but with the new resource version
|
||||
accessor, err := utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
klog.Errorf("error getting object accessor: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
|
||||
accessor.SetResourceVersionInt64(evt.Resource.Version) |
||||
} |
||||
|
||||
// apply any predicates not handled in storage
|
||||
matches, err := d.opts.Predicate.Matches(obj) |
||||
if err != nil { |
||||
klog.Errorf("error matching object: %s", err) |
||||
return watch.Error, nil, err |
||||
} |
||||
if !matches { |
||||
continue decode |
||||
} |
||||
default: |
||||
watchAction = watch.Error |
||||
} |
||||
|
||||
return watchAction, obj, nil |
||||
} |
||||
} |
||||
|
||||
func (d *streamDecoder) Close() { |
||||
err := d.client.CloseSend() |
||||
if err != nil { |
||||
klog.Errorf("error closing watch stream: %s", err) |
||||
} |
||||
} |
||||
|
||||
var _ watch.Decoder = (*streamDecoder)(nil) |
@ -0,0 +1,119 @@ |
||||
package entitybridge |
||||
|
||||
import ( |
||||
"errors" |
||||
"io" |
||||
"time" |
||||
|
||||
grpcCodes "google.golang.org/grpc/codes" |
||||
grpcStatus "google.golang.org/grpc/status" |
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||
"k8s.io/klog/v2" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
entitystore "github.com/grafana/grafana/pkg/services/apiserver/storage/entity" |
||||
"github.com/grafana/grafana/pkg/services/store/entity" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
type decoder struct { |
||||
client entity.EntityStore_WatchClient |
||||
} |
||||
|
||||
// Any errors will end the stream
|
||||
func (d *decoder) next() (*resource.WrittenEvent, error) { |
||||
decode: |
||||
for { |
||||
err := d.client.Context().Err() |
||||
if err != nil { |
||||
klog.Errorf("client: context error: %s\n", err) |
||||
return nil, err |
||||
} |
||||
|
||||
rsp, err := d.client.Recv() |
||||
if errors.Is(err, io.EOF) { |
||||
return nil, err |
||||
} |
||||
|
||||
if grpcStatus.Code(err) == grpcCodes.Canceled { |
||||
return nil, err |
||||
} |
||||
|
||||
if err != nil { |
||||
klog.Errorf("client: error receiving result: %s", err) |
||||
return nil, err |
||||
} |
||||
|
||||
if rsp.Entity == nil { |
||||
klog.Errorf("client: received nil entity\n") |
||||
continue decode |
||||
} |
||||
|
||||
event := resource.WriteEvent{ |
||||
Key: &resource.ResourceKey{ |
||||
Group: rsp.Entity.Namespace, |
||||
Resource: rsp.Entity.Resource, |
||||
Namespace: rsp.Entity.Namespace, |
||||
Name: rsp.Entity.Name, |
||||
}, |
||||
} |
||||
|
||||
switch rsp.Entity.Action { |
||||
case entity.Entity_CREATED: |
||||
event.Type = resource.WatchEvent_ADDED |
||||
case entity.Entity_UPDATED: |
||||
event.Type = resource.WatchEvent_MODIFIED |
||||
case entity.Entity_DELETED: |
||||
event.Type = resource.WatchEvent_DELETED |
||||
default: |
||||
klog.Errorf("unsupported action\n") |
||||
continue decode |
||||
} |
||||
|
||||
// Now decode the bytes into an object
|
||||
obj := &unstructured.Unstructured{} |
||||
err = entitystore.EntityToRuntimeObject(rsp.Entity, obj, unstructured.UnstructuredJSONScheme) |
||||
if err != nil { |
||||
klog.Errorf("error decoding entity: %s", err) |
||||
return nil, err |
||||
} |
||||
|
||||
event.Value, err = obj.MarshalJSON() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
event.Object, err = utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Decode the old value
|
||||
if rsp.Previous != nil { |
||||
err = entitystore.EntityToRuntimeObject(rsp.Previous, obj, unstructured.UnstructuredJSONScheme) |
||||
if err != nil { |
||||
klog.Errorf("error decoding entity: %s", err) |
||||
return nil, err |
||||
} |
||||
event.ObjectOld, err = utils.MetaAccessor(obj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
event.PreviousRV, err = event.ObjectOld.GetResourceVersionInt64() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
return &resource.WrittenEvent{ |
||||
ResourceVersion: rsp.Entity.ResourceVersion, |
||||
Timestamp: time.Now().UnixMilli(), |
||||
WriteEvent: event, |
||||
}, nil |
||||
} |
||||
} |
||||
|
||||
func (d *decoder) close() { |
||||
err := d.client.CloseSend() |
||||
if err != nil { |
||||
klog.Errorf("error closing watch stream: %s", err) |
||||
} |
||||
} |
@ -0,0 +1,5 @@ |
||||
// Package entitybridge implements an ResourceServer using existing EntityAPI contracts
|
||||
//
|
||||
// This package will be removed and replaced with a more streamlined SQL implementation
|
||||
// that leverages what we have learned from the entity deployments so far
|
||||
package entitybridge |
@ -0,0 +1,237 @@ |
||||
package entitybridge |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"k8s.io/apimachinery/pkg/selection" |
||||
"k8s.io/klog/v2" |
||||
|
||||
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" |
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"github.com/grafana/grafana/pkg/services/store/entity" |
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash" |
||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
||||
) |
||||
|
||||
// Creates a ResourceServer using the existing entity tables
|
||||
// NOTE: the server is optional and only used to pass init+close functions
|
||||
func EntityAsResourceServer(client entity.EntityStoreClient, server sqlstash.SqlEntityServer, tracer tracing.Tracer) (resource.ResourceServer, error) { |
||||
if client == nil { |
||||
return nil, fmt.Errorf("client must be defined") |
||||
} |
||||
|
||||
// Use this bridge as the resource store
|
||||
bridge := &entityBridge{ |
||||
client: client, |
||||
server: server, |
||||
} |
||||
return resource.NewResourceServer(resource.ResourceServerOptions{ |
||||
Tracer: tracer, |
||||
Backend: bridge, |
||||
Diagnostics: bridge, |
||||
Lifecycle: bridge, |
||||
}) |
||||
} |
||||
|
||||
// This is only created if we use the entity implementation
|
||||
type entityBridge struct { |
||||
client entity.EntityStoreClient |
||||
|
||||
// When running directly
|
||||
// (we need the explicit version so we have access to init+stop)
|
||||
server sqlstash.SqlEntityServer |
||||
} |
||||
|
||||
// Init implements ResourceServer.
|
||||
func (b *entityBridge) Init() error { |
||||
if b.server != nil { |
||||
return b.server.Init() |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Stop implements ResourceServer.
|
||||
func (b *entityBridge) Stop() { |
||||
if b.server != nil { |
||||
b.server.Stop() |
||||
} |
||||
} |
||||
|
||||
// Convert resource key to the entity key
|
||||
func toEntityKey(key *resource.ResourceKey) string { |
||||
e := grafanaregistry.Key{ |
||||
Group: key.Group, |
||||
Resource: key.Resource, |
||||
Namespace: key.Namespace, |
||||
Name: key.Name, |
||||
} |
||||
return e.String() |
||||
} |
||||
|
||||
func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) { |
||||
key := toEntityKey(event.Key) |
||||
|
||||
// Delete does not need to create an entity first
|
||||
if event.Type == resource.WatchEvent_DELETED { |
||||
rsp, err := b.client.Delete(ctx, &entity.DeleteEntityRequest{ |
||||
Key: key, |
||||
PreviousVersion: event.PreviousRV, |
||||
}) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return rsp.Entity.ResourceVersion, err |
||||
} |
||||
|
||||
gvr := event.Object.GetGroupVersionKind() |
||||
obj := event.Object |
||||
msg := &entity.Entity{ |
||||
Key: key, |
||||
Group: event.Key.Group, |
||||
Resource: event.Key.Resource, |
||||
Namespace: event.Key.Namespace, |
||||
Name: event.Key.Name, |
||||
Guid: string(event.Object.GetUID()), |
||||
GroupVersion: gvr.Version, |
||||
|
||||
Folder: obj.GetFolder(), |
||||
Body: event.Value, |
||||
Message: event.Object.GetMessage(), |
||||
|
||||
Labels: obj.GetLabels(), |
||||
Size: int64(len(event.Value)), |
||||
} |
||||
|
||||
switch event.Type { |
||||
case resource.WatchEvent_ADDED: |
||||
msg.Action = entity.Entity_CREATED |
||||
rsp, err := b.client.Create(ctx, &entity.CreateEntityRequest{Entity: msg}) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return rsp.Entity.ResourceVersion, err |
||||
|
||||
case resource.WatchEvent_MODIFIED: |
||||
msg.Action = entity.Entity_UPDATED |
||||
rsp, err := b.client.Update(ctx, &entity.UpdateEntityRequest{ |
||||
Entity: msg, |
||||
PreviousVersion: event.PreviousRV, |
||||
}) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return rsp.Entity.ResourceVersion, err |
||||
|
||||
default: |
||||
} |
||||
|
||||
return 0, fmt.Errorf("unsupported operation: %s", event.Type.String()) |
||||
} |
||||
|
||||
func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { |
||||
client, err := b.client.Watch(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
req := &entity.EntityWatchRequest{ |
||||
Action: entity.EntityWatchRequest_START, |
||||
Labels: map[string]string{}, |
||||
WithBody: true, |
||||
WithStatus: true, |
||||
SendInitialEvents: false, |
||||
} |
||||
|
||||
err = client.Send(req) |
||||
if err != nil { |
||||
err2 := client.CloseSend() |
||||
if err2 != nil { |
||||
klog.Errorf("watch close failed: %s\n", err2) |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
reader := &decoder{client} |
||||
stream := make(chan *resource.WrittenEvent, 10) |
||||
go func() { |
||||
for { |
||||
evt, err := reader.next() |
||||
if err != nil { |
||||
reader.close() |
||||
close(stream) |
||||
return |
||||
} |
||||
stream <- evt |
||||
} |
||||
}() |
||||
return stream, nil |
||||
} |
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { |
||||
rsp, err := b.client.IsHealthy(ctx, &entity.HealthCheckRequest{ |
||||
Service: req.Service, // ??
|
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &resource.HealthCheckResponse{ |
||||
Status: resource.HealthCheckResponse_ServingStatus(rsp.Status), |
||||
}, nil |
||||
} |
||||
|
||||
// Read implements ResourceServer.
|
||||
func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { |
||||
v, err := b.client.Read(ctx, &entity.ReadEntityRequest{ |
||||
Key: toEntityKey(req.Key), |
||||
WithBody: true, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &resource.ReadResponse{ |
||||
ResourceVersion: v.ResourceVersion, |
||||
Value: v.Body, |
||||
}, nil |
||||
} |
||||
|
||||
// List implements ResourceServer.
|
||||
func (b *entityBridge) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { |
||||
key := req.Options.Key |
||||
query := &entity.EntityListRequest{ |
||||
NextPageToken: req.NextPageToken, |
||||
Limit: req.Limit, |
||||
Key: []string{toEntityKey(key)}, |
||||
WithBody: true, |
||||
} |
||||
|
||||
if len(req.Options.Labels) > 0 { |
||||
query.Labels = make(map[string]string) |
||||
for _, q := range req.Options.Labels { |
||||
// The entity structure only supports equals
|
||||
// the rest will be processed handled by the upstream predicate
|
||||
op := selection.Operator(q.Operator) |
||||
if op == selection.Equals || op == selection.DoubleEquals { |
||||
query.Labels[q.Key] = q.Values[0] |
||||
} |
||||
} |
||||
} |
||||
|
||||
found, err := b.client.List(ctx, query) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp := &resource.ListResponse{ |
||||
ResourceVersion: found.ResourceVersion, |
||||
NextPageToken: found.NextPageToken, |
||||
} |
||||
for _, item := range found.Results { |
||||
rsp.Items = append(rsp.Items, &resource.ResourceWrapper{ |
||||
ResourceVersion: item.ResourceVersion, |
||||
Value: item.Body, |
||||
}) |
||||
} |
||||
return rsp, nil |
||||
} |
@ -0,0 +1,10 @@ |
||||
version: v1 |
||||
plugins: |
||||
- plugin: go |
||||
out: pkg/storage/unified/resource |
||||
opt: paths=source_relative |
||||
- plugin: go-grpc |
||||
out: pkg/storage/unified/resource |
||||
opt: |
||||
- paths=source_relative |
||||
- require_unimplemented_servers=false |
@ -0,0 +1,7 @@ |
||||
version: v1 |
||||
breaking: |
||||
use: |
||||
- FILE |
||||
lint: |
||||
use: |
||||
- DEFAULT |
@ -0,0 +1,285 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"bytes" |
||||
context "context" |
||||
"fmt" |
||||
"io" |
||||
"sort" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"go.opentelemetry.io/otel/trace" |
||||
"go.opentelemetry.io/otel/trace/noop" |
||||
"gocloud.dev/blob" |
||||
_ "gocloud.dev/blob/fileblob" |
||||
_ "gocloud.dev/blob/memblob" |
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
) |
||||
|
||||
type CDKBackendOptions struct { |
||||
Tracer trace.Tracer |
||||
Bucket *blob.Bucket |
||||
RootFolder string |
||||
} |
||||
|
||||
func NewCDKBackend(ctx context.Context, opts CDKBackendOptions) (StorageBackend, error) { |
||||
if opts.Tracer == nil { |
||||
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store") |
||||
} |
||||
|
||||
if opts.Bucket == nil { |
||||
return nil, fmt.Errorf("missing bucket") |
||||
} |
||||
|
||||
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{ |
||||
Prefix: opts.RootFolder, |
||||
Delimiter: "/", |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if found == nil { |
||||
return nil, fmt.Errorf("the root folder does not exist") |
||||
} |
||||
|
||||
backend := &cdkBackend{ |
||||
tracer: opts.Tracer, |
||||
bucket: opts.Bucket, |
||||
root: opts.RootFolder, |
||||
} |
||||
backend.rv.Swap(time.Now().UnixMilli()) |
||||
return backend, nil |
||||
} |
||||
|
||||
type cdkBackend struct { |
||||
tracer trace.Tracer |
||||
bucket *blob.Bucket |
||||
root string |
||||
|
||||
mutex sync.Mutex |
||||
rv atomic.Int64 |
||||
|
||||
// Simple watch stream -- NOTE, this only works for single tenant!
|
||||
broadcaster Broadcaster[*WrittenEvent] |
||||
stream chan<- *WrittenEvent |
||||
} |
||||
|
||||
func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string { |
||||
var buffer bytes.Buffer |
||||
buffer.WriteString(s.root) |
||||
|
||||
if key.Group == "" { |
||||
return buffer.String() |
||||
} |
||||
buffer.WriteString(key.Group) |
||||
|
||||
if key.Resource == "" { |
||||
return buffer.String() |
||||
} |
||||
buffer.WriteString("/") |
||||
buffer.WriteString(key.Resource) |
||||
|
||||
if key.Namespace == "" { |
||||
if key.Name == "" { |
||||
return buffer.String() |
||||
} |
||||
buffer.WriteString("/__cluster__") |
||||
} else { |
||||
buffer.WriteString("/") |
||||
buffer.WriteString(key.Namespace) |
||||
} |
||||
|
||||
if key.Name == "" { |
||||
return buffer.String() |
||||
} |
||||
buffer.WriteString("/") |
||||
buffer.WriteString(key.Name) |
||||
|
||||
if rv > 0 { |
||||
buffer.WriteString(fmt.Sprintf("/%d.json", rv)) |
||||
} |
||||
return buffer.String() |
||||
} |
||||
|
||||
func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { |
||||
// Scope the lock
|
||||
{ |
||||
s.mutex.Lock() |
||||
defer s.mutex.Unlock() |
||||
|
||||
rv = s.rv.Add(1) |
||||
err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{ |
||||
ContentType: "application/json", |
||||
}) |
||||
} |
||||
|
||||
// Async notify all subscribers
|
||||
if s.stream != nil { |
||||
go func() { |
||||
write := &WrittenEvent{ |
||||
WriteEvent: event, |
||||
Timestamp: time.Now().UnixMilli(), |
||||
ResourceVersion: rv, |
||||
} |
||||
s.stream <- write |
||||
}() |
||||
} |
||||
return rv, err |
||||
} |
||||
|
||||
func (s *cdkBackend) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { |
||||
rv := req.ResourceVersion |
||||
|
||||
path := s.getPath(req.Key, req.ResourceVersion) |
||||
if rv < 1 { |
||||
iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"}) |
||||
for { |
||||
obj, err := iter.Next(ctx) |
||||
if err == io.EOF { |
||||
break |
||||
} |
||||
if strings.HasSuffix(obj.Key, ".json") { |
||||
idx := strings.LastIndex(obj.Key, "/") + 1 |
||||
edx := strings.LastIndex(obj.Key, ".") |
||||
if idx > 0 { |
||||
v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) |
||||
if err == nil && v > rv { |
||||
rv = v |
||||
path = obj.Key // find the path with biggest resource version
|
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
raw, err := s.bucket.ReadAll(ctx, path) |
||||
if raw == nil || (err == nil && isDeletedMarker(raw)) { |
||||
return nil, apierrors.NewNotFound(schema.GroupResource{ |
||||
Group: req.Key.Group, |
||||
Resource: req.Key.Resource, |
||||
}, req.Key.Name) |
||||
} |
||||
|
||||
return &ReadResponse{ |
||||
ResourceVersion: rv, |
||||
Value: raw, |
||||
}, err |
||||
} |
||||
|
||||
func isDeletedMarker(raw []byte) bool { |
||||
if bytes.Contains(raw, []byte(`"DeletedMarker"`)) { |
||||
tmp := &unstructured.Unstructured{} |
||||
err := tmp.UnmarshalJSON(raw) |
||||
if err == nil && tmp.GetKind() == "DeletedMarker" { |
||||
return true |
||||
} |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) (*ListResponse, error) { |
||||
resources, err := buildTree(ctx, s, req.Options.Key) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp := &ListResponse{ |
||||
ResourceVersion: s.rv.Load(), |
||||
} |
||||
for _, item := range resources { |
||||
latest := item.versions[0] |
||||
raw, err := s.bucket.ReadAll(ctx, latest.key) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if !isDeletedMarker(raw) { |
||||
rsp.Items = append(rsp.Items, &ResourceWrapper{ |
||||
ResourceVersion: latest.rv, |
||||
Value: raw, |
||||
}) |
||||
} |
||||
} |
||||
return rsp, nil |
||||
} |
||||
|
||||
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { |
||||
s.mutex.Lock() |
||||
defer s.mutex.Unlock() |
||||
|
||||
if s.broadcaster == nil { |
||||
var err error |
||||
s.broadcaster, err = NewBroadcaster(context.Background(), func(c chan<- *WrittenEvent) error { |
||||
s.stream = c |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
return s.broadcaster.Subscribe(ctx) |
||||
} |
||||
|
||||
// group > resource > namespace > name > versions
|
||||
type cdkResource struct { |
||||
prefix string |
||||
versions []cdkVersion |
||||
} |
||||
type cdkVersion struct { |
||||
rv int64 |
||||
key string |
||||
} |
||||
|
||||
func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResource, error) { |
||||
byPrefix := make(map[string]*cdkResource) |
||||
|
||||
path := s.getPath(key, 0) |
||||
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
|
||||
for { |
||||
obj, err := iter.Next(ctx) |
||||
if err == io.EOF { |
||||
break |
||||
} |
||||
if strings.HasSuffix(obj.Key, ".json") { |
||||
idx := strings.LastIndex(obj.Key, "/") + 1 |
||||
edx := strings.LastIndex(obj.Key, ".") |
||||
if idx > 0 { |
||||
rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) |
||||
if err == nil { |
||||
prefix := obj.Key[:idx] |
||||
res, ok := byPrefix[prefix] |
||||
if !ok { |
||||
res = &cdkResource{prefix: prefix} |
||||
byPrefix[prefix] = res |
||||
} |
||||
|
||||
res.versions = append(res.versions, cdkVersion{ |
||||
rv: rv, |
||||
key: obj.Key, |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Now sort all versions
|
||||
resources := make([]cdkResource, 0, len(byPrefix)) |
||||
for _, res := range byPrefix { |
||||
sort.Slice(res.versions, func(i, j int) bool { |
||||
return res.versions[i].rv > res.versions[j].rv |
||||
}) |
||||
resources = append(resources, *res) |
||||
} |
||||
sort.Slice(resources, func(i, j int) bool { |
||||
a := resources[i].versions[0].rv |
||||
b := resources[j].versions[0].rv |
||||
return a > b |
||||
}) |
||||
|
||||
return resources, nil |
||||
} |
@ -0,0 +1,30 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"github.com/fullstorydev/grpchan" |
||||
"github.com/fullstorydev/grpchan/inprocgrpc" |
||||
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" |
||||
"google.golang.org/grpc" |
||||
|
||||
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc" |
||||
) |
||||
|
||||
func NewLocalResourceStoreClient(server ResourceStoreServer) ResourceStoreClient { |
||||
channel := &inprocgrpc.Channel{} |
||||
|
||||
auth := &grpcUtils.Authenticator{} |
||||
|
||||
channel.RegisterService( |
||||
grpchan.InterceptServer( |
||||
&ResourceStore_ServiceDesc, |
||||
grpcAuth.UnaryServerInterceptor(auth.Authenticate), |
||||
grpcAuth.StreamServerInterceptor(auth.Authenticate), |
||||
), |
||||
server, |
||||
) |
||||
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)) |
||||
} |
||||
|
||||
func NewResourceStoreClientGRPC(channel *grpc.ClientConn) ResourceStoreClient { |
||||
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)) |
||||
} |
@ -0,0 +1,37 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
) |
||||
|
||||
// This object is written when an object is deleted
|
||||
type DeletedMarker struct { |
||||
metav1.TypeMeta `json:",inline"` |
||||
metav1.ObjectMeta `json:"metadata,omitempty"` |
||||
} |
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeletedMarker) DeepCopyInto(out *DeletedMarker) { |
||||
*out = *in |
||||
out.TypeMeta = in.TypeMeta |
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) |
||||
} |
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletedMarker.
|
||||
func (in *DeletedMarker) DeepCopy() *DeletedMarker { |
||||
if in == nil { |
||||
return nil |
||||
} |
||||
out := new(DeletedMarker) |
||||
in.DeepCopyInto(out) |
||||
return out |
||||
} |
||||
|
||||
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
|
||||
func (in *DeletedMarker) DeepCopyObject() runtime.Object { |
||||
if c := in.DeepCopy(); c != nil { |
||||
return c |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,2 @@ |
||||
// Package resource creates a ResourceServer that handles generic storage operations
|
||||
package resource |
@ -0,0 +1,92 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
context "context" |
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
) |
||||
|
||||
type WriteEvent struct { |
||||
Type WatchEvent_Type // ADDED, MODIFIED, DELETED
|
||||
Key *ResourceKey // the request key
|
||||
PreviousRV int64 // only for Update+Delete
|
||||
|
||||
// The json payload (without resourceVersion)
|
||||
Value []byte |
||||
|
||||
// Access real fields
|
||||
Object utils.GrafanaMetaAccessor |
||||
|
||||
// Access to the old metadata
|
||||
ObjectOld utils.GrafanaMetaAccessor |
||||
} |
||||
|
||||
// WriteEvents after they include a resource version
|
||||
type WrittenEvent struct { |
||||
WriteEvent |
||||
|
||||
// The resource version
|
||||
ResourceVersion int64 |
||||
|
||||
// Timestamp when the event is created
|
||||
Timestamp int64 |
||||
} |
||||
|
||||
// A function to write events
|
||||
type EventAppender = func(context.Context, *WriteEvent) (int64, error) |
||||
|
||||
type writeEventBuilder struct { |
||||
EventID int64 |
||||
Key *ResourceKey // the request key
|
||||
Type WatchEvent_Type |
||||
|
||||
Requester identity.Requester |
||||
Object *unstructured.Unstructured |
||||
|
||||
// Access the raw metadata values
|
||||
Meta utils.GrafanaMetaAccessor |
||||
OldMeta utils.GrafanaMetaAccessor |
||||
} |
||||
|
||||
func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) { |
||||
builder := &writeEventBuilder{ |
||||
Object: &unstructured.Unstructured{}, |
||||
} |
||||
err := builder.Object.UnmarshalJSON(value) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
builder.Meta, err = utils.MetaAccessor(builder.Object) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if oldValue == nil { |
||||
builder.Type = WatchEvent_ADDED |
||||
} else { |
||||
builder.Type = WatchEvent_MODIFIED |
||||
|
||||
temp := &unstructured.Unstructured{} |
||||
err = temp.UnmarshalJSON(oldValue) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
builder.OldMeta, err = utils.MetaAccessor(temp) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
return builder, nil |
||||
} |
||||
|
||||
func (b *writeEventBuilder) toEvent() (event WriteEvent, err error) { |
||||
event.Key = b.Key |
||||
event.Type = b.Type |
||||
event.ObjectOld = b.OldMeta |
||||
event.Object = b.Meta |
||||
event.Value, err = b.Object.MarshalJSON() |
||||
return // includes the named values
|
||||
} |
@ -1,20 +1,95 @@ |
||||
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= |
||||
cloud.google.com/go/auth v0.2.2 h1:gmxNJs4YZYcw6YvKRtVBaF2fyUE6UrWPyzU8jHvYfmI= |
||||
cloud.google.com/go/auth/oauth2adapt v0.2.1 h1:VSPmMmUlT8CkIZ2PzD9AlLN+R3+D1clXMWHHa6vG/Ag= |
||||
cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU= |
||||
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= |
||||
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= |
||||
cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg= |
||||
github.com/aws/aws-sdk-go v1.51.31 h1:4TM+sNc+Dzs7wY1sJ0+J8i60c6rkgnKP1pvPx8ghsSY= |
||||
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA= |
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU= |
||||
github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw= |
||||
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 h1:RQQ5fzclAKJyY5TvF+fkjJEwzK4hnxQCLOu5JXzDmQo= |
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 h1:LWPg5zjHV9oz/myQr4wMs0gi4CjnDN/ILmyZUFYXZsU= |
||||
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.3 h1:ir7iEq78s4txFGgwcLqD6q9IIPzTQNRJXulJd9h/zQo= |
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 h1:onz/VaaxZ7Z4V+WIN9Txly9XLTmoOh1oJ8XcAC3pako= |
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3 h1:9stUQR/u2KXU6HkFJYlqnZEjBnbgrVbG6I5HN09xZh0= |
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10 h1:by9P+oy3P/CwggN4ClnW2D4oL91QV7pBzBICi1chZvQ= |
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 h1:T4pFel53bkHjL2mMo+4DKE6r6AuoZnM0fg7k1/ratr4= |
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 h1:I0dcwWitE752hVSMrsLCxqNQ+UdEp3nACx2bYNMQq+k= |
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 h1:Gh1Gpyh01Yvn7ilO/b/hr01WgNpaszfbKMUgqM186xQ= |
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 h1:BKjwCJPnANbkwQ8vzSbaZDKawwagDubrH/z/c0X+kbQ= |
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 h1:rMPtwA7zzkSQZhhz9U3/SoIDz/NZ7Q+iRn4EIO8rSyU= |
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 h1:frW4ikGcxfAEDfmQqWgMLp+F1n4nRo9sF39OcIb5BkQ= |
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 h1:cJGRyzCSVwZC7zZZ1xbx9m32UnrKydRYhOvcD1NYP9Q= |
||||
github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE= |
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= |
||||
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= |
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= |
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= |
||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= |
||||
github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas= |
||||
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= |
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= |
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= |
||||
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= |
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= |
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= |
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= |
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= |
||||
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= |
||||
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= |
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= |
||||
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= |
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= |
||||
github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA= |
||||
github.com/grafana/authlib v0.0.0-20240611075137-331cbe4e840f h1:hvRCAv+TgcHu3i/Sd7lFJx84iEtgzDCYuk7OWeXatD0= |
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= |
||||
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= |
||||
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= |
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= |
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= |
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= |
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= |
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= |
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= |
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= |
||||
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= |
||||
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= |
||||
github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= |
||||
github.com/prometheus/procfs v0.14.0 h1:Lw4VdGGoKEZilJsayHf0B+9YgLGREba2C6xr+Fdfq6s= |
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= |
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= |
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= |
||||
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= |
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 h1:A3SayB3rNyt+1S6qpI9mHPkeHTZbD7XILEqWnYZb2l0= |
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI= |
||||
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= |
||||
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= |
||||
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= |
||||
gocloud.dev v0.25.0 h1:Y7vDq8xj7SyM848KXf32Krda2e6jQ4CLh/mTeCSqXtk= |
||||
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= |
||||
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= |
||||
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= |
||||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= |
||||
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= |
||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= |
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= |
||||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= |
||||
google.golang.org/api v0.176.0 h1:dHj1/yv5Dm/eQTXiP9hNCRT3xzJHWXeNdRq29XbMxoE= |
||||
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= |
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU= |
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU= |
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= |
||||
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= |
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= |
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= |
||||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= |
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= |
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= |
||||
k8s.io/apimachinery v0.29.3 h1:2tbx+5L7RNvqJjn7RIuIKu9XTsIZ9Z5wX2G22XAa5EU= |
||||
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= |
||||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= |
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= |
||||
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= |
||||
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= |
||||
|
@ -0,0 +1,45 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
context "context" |
||||
"fmt" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
) |
||||
|
||||
type WriteAccessHooks struct { |
||||
// Check if a user has access to write folders
|
||||
// When this is nil, no resources can have folders configured
|
||||
Folder func(ctx context.Context, user identity.Requester, uid string) bool |
||||
|
||||
// When configured, this will make sure a user is allowed to save to a given origin
|
||||
Origin func(ctx context.Context, user identity.Requester, origin string) bool |
||||
} |
||||
|
||||
type LifecycleHooks interface { |
||||
// Called once at initialization
|
||||
Init() error |
||||
|
||||
// Stop function -- after calling this, any additional storage functions may error
|
||||
Stop() |
||||
} |
||||
|
||||
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error { |
||||
if a.Folder == nil { |
||||
return fmt.Errorf("writing folders is not supported") |
||||
} |
||||
if !a.Folder(ctx, user, uid) { |
||||
return fmt.Errorf("not allowed to write resource to folder") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error { |
||||
if a.Origin == nil || uid == "UI" { |
||||
return nil // default to OK
|
||||
} |
||||
if !a.Origin(ctx, user, uid) { |
||||
return fmt.Errorf("not allowed to write resource at origin") |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,17 @@ |
||||
package resource |
||||
|
||||
func matchesQueryKey(query *ResourceKey, key *ResourceKey) bool { |
||||
if query.Group != key.Group { |
||||
return false |
||||
} |
||||
if query.Resource != key.Resource { |
||||
return false |
||||
} |
||||
if query.Namespace != "" && query.Namespace != key.Namespace { |
||||
return false |
||||
} |
||||
if query.Name != "" && query.Name != key.Name { |
||||
return false |
||||
} |
||||
return true |
||||
} |
@ -0,0 +1,21 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestKeyMatching(t *testing.T) { |
||||
t.Run("key matching", func(t *testing.T) { |
||||
require.True(t, matchesQueryKey(&ResourceKey{ |
||||
Group: "ggg", |
||||
Resource: "rrr", |
||||
Namespace: "ns", |
||||
}, &ResourceKey{ |
||||
Group: "ggg", |
||||
Resource: "rrr", |
||||
Namespace: "ns", |
||||
})) |
||||
}) |
||||
} |
@ -0,0 +1,41 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
var ( |
||||
once sync.Once |
||||
StorageServerMetrics *StorageApiMetrics |
||||
) |
||||
|
||||
type StorageApiMetrics struct { |
||||
OptimisticLockFailed *prometheus.CounterVec |
||||
} |
||||
|
||||
func NewStorageMetrics() *StorageApiMetrics { |
||||
once.Do(func() { |
||||
StorageServerMetrics = &StorageApiMetrics{ |
||||
OptimisticLockFailed: prometheus.NewCounterVec( |
||||
prometheus.CounterOpts{ |
||||
Namespace: "resource_storage", |
||||
Name: "optimistic_lock_failed", |
||||
Help: "count of optimistic locks failed", |
||||
}, |
||||
[]string{"action"}, |
||||
), |
||||
} |
||||
}) |
||||
|
||||
return StorageServerMetrics |
||||
} |
||||
|
||||
func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) { |
||||
s.OptimisticLockFailed.Collect(ch) |
||||
} |
||||
|
||||
func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) { |
||||
s.OptimisticLockFailed.Describe(ch) |
||||
} |
@ -0,0 +1,41 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"context" |
||||
) |
||||
|
||||
var ( |
||||
_ DiagnosticsServer = &noopService{} |
||||
_ LifecycleHooks = &noopService{} |
||||
) |
||||
|
||||
// noopService is a helper implementation to simplify tests
|
||||
// It does nothing except return errors when asked to do anything real
|
||||
type noopService struct{} |
||||
|
||||
// Init implements ResourceServer.
|
||||
func (n *noopService) Init() error { |
||||
return nil |
||||
} |
||||
|
||||
// Stop implements ResourceServer.
|
||||
func (n *noopService) Stop() { |
||||
// nothing
|
||||
} |
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (n *noopService) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { |
||||
return &HealthCheckResponse{ |
||||
Status: HealthCheckResponse_SERVING, |
||||
}, nil |
||||
} |
||||
|
||||
// Read implements ResourceServer.
|
||||
func (n *noopService) Read(context.Context, *ReadRequest) (*ReadResponse, error) { |
||||
return nil, ErrNotImplementedYet |
||||
} |
||||
|
||||
// List implements ResourceServer.
|
||||
func (n *noopService) List(context.Context, *ListRequest) (*ListResponse, error) { |
||||
return nil, ErrNotImplementedYet |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,303 @@ |
||||
syntax = "proto3"; |
||||
package resource; |
||||
|
||||
option go_package = "github.com/grafana/grafana/pkg/storage/unified/resource"; |
||||
|
||||
message ResourceKey { |
||||
// Namespace (tenant) |
||||
string namespace = 2; |
||||
// Resource Group |
||||
string group = 1; |
||||
// The resource type |
||||
string resource = 3; |
||||
// Resource identifier (unique within namespace+group+resource) |
||||
string name = 4; |
||||
} |
||||
|
||||
message ResourceWrapper { |
||||
// The resource version |
||||
int64 resource_version = 1; |
||||
|
||||
// Full kubernetes json bytes (although the resource version may not be accurate) |
||||
bytes value = 2; |
||||
} |
||||
|
||||
// The history and trash commands need access to commit messages |
||||
message ResourceMeta { |
||||
// The resource version |
||||
int64 resource_version = 1; |
||||
|
||||
// Size of the full resource body |
||||
int32 size = 3; |
||||
|
||||
// Hash for the resource |
||||
string hash = 4; |
||||
|
||||
// The kubernetes metadata section (not the full resource) |
||||
// https://github.com/kubernetes/kubernetes/blob/v1.30.2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L1496 |
||||
bytes partial_object_meta = 6; |
||||
} |
||||
|
||||
// Status structure is copied from: |
||||
// https://github.com/kubernetes/apimachinery/blob/v0.30.1/pkg/apis/meta/v1/generated.proto#L979 |
||||
message StatusResult { |
||||
// Status of the operation. |
||||
// One of: "Success" or "Failure". |
||||
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status |
||||
// +optional |
||||
string status = 1; |
||||
// A human-readable description of the status of this operation. |
||||
// +optional |
||||
string message = 2; |
||||
// A machine-readable description of why this operation is in the |
||||
// "Failure" status. If this value is empty there |
||||
// is no information available. A Reason clarifies an HTTP status |
||||
// code but does not override it. |
||||
// +optional |
||||
string reason = 3; |
||||
// Suggested HTTP return code for this status, 0 if not set. |
||||
// +optional |
||||
int32 code = 4; |
||||
} |
||||
|
||||
// ---------------------------------- |
||||
// CRUD Objects |
||||
// ---------------------------------- |
||||
|
||||
message CreateRequest { |
||||
// Requires group+resource to be configuired |
||||
// If name is not set, a unique name will be generated |
||||
// The resourceVersion should not be set |
||||
ResourceKey key = 1; |
||||
|
||||
// The resource JSON. |
||||
bytes value = 2; |
||||
} |
||||
|
||||
message CreateResponse { |
||||
// Status code |
||||
StatusResult status = 1; |
||||
|
||||
// The updated resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// The resource JSON. With managed annotations included |
||||
bytes value = 3; |
||||
} |
||||
|
||||
message UpdateRequest { |
||||
// Full key must be set |
||||
ResourceKey key = 1; |
||||
|
||||
// The current resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// The resource JSON. |
||||
bytes value = 3; |
||||
} |
||||
|
||||
message UpdateResponse { |
||||
// Status code |
||||
StatusResult status = 1; |
||||
|
||||
// The updated resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// The resource JSON. With managed annotations included |
||||
bytes value = 3; |
||||
} |
||||
|
||||
message DeleteRequest { |
||||
ResourceKey key = 1; |
||||
|
||||
// The current resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// Preconditions: make sure the uid matches the current saved value |
||||
// +optional |
||||
string uid = 3; |
||||
} |
||||
|
||||
message DeleteResponse { |
||||
// Status code |
||||
StatusResult status = 1; |
||||
|
||||
// The resource version for the deletion marker |
||||
int64 resource_version = 2; |
||||
} |
||||
|
||||
message ReadRequest { |
||||
ResourceKey key = 1; |
||||
|
||||
// Optionally pick an explicit resource version |
||||
int64 resource_version = 3; |
||||
} |
||||
|
||||
message ReadResponse { |
||||
// Status code |
||||
StatusResult status = 1; |
||||
|
||||
// The new resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// The properties |
||||
bytes value = 3; |
||||
} |
||||
|
||||
// ---------------------------------- |
||||
// List Request/Response |
||||
// ---------------------------------- |
||||
|
||||
// The label filtering requirements: |
||||
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/labels/selector.go#L141 |
||||
message Requirement { |
||||
string key = 1; |
||||
string operator = 2; // See https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/selection/operator.go#L21 |
||||
repeated string values = 3; // typically one value, but depends on the operator |
||||
} |
||||
|
||||
|
||||
message ListOptions { |
||||
// Group+Namespace+Resource (not name) |
||||
ResourceKey key = 1; |
||||
|
||||
// (best effort) Match label |
||||
// Allowed to send more results than actually match because the filter will be appled |
||||
// to the resutls agin in the client. That time with the full field selector |
||||
repeated Requirement labels = 2; |
||||
|
||||
// (best effort) fields matcher |
||||
// Allowed to send more results than actually match because the filter will be appled |
||||
// to the resutls agin in the client. That time with the full field selector |
||||
repeated Requirement fields = 3; |
||||
} |
||||
|
||||
enum ResourceVersionMatch { |
||||
NotOlderThan = 0; |
||||
Exact = 1; |
||||
} |
||||
|
||||
message ListRequest { |
||||
// Starting from the requested page (other query parameters must match!) |
||||
string next_page_token = 1; |
||||
|
||||
// The resource version |
||||
int64 resource_version = 2; |
||||
|
||||
// List options |
||||
ResourceVersionMatch version_match = 3; |
||||
|
||||
// Maximum number of items to return |
||||
// NOTE responses will also be limited by the response payload size |
||||
int64 limit = 4; |
||||
|
||||
// Filtering |
||||
ListOptions options = 5; |
||||
} |
||||
|
||||
message ListResponse { |
||||
repeated ResourceWrapper items = 1; |
||||
|
||||
// When more results exist, pass this in the next request |
||||
string next_page_token = 2; |
||||
|
||||
// ResourceVersion of the list response |
||||
int64 resource_version = 3; |
||||
|
||||
// remainingItemCount is the number of subsequent items in the list which are not included in this |
||||
// list response. If the list request contained label or field selectors, then the number of |
||||
// remaining items is unknown and the field will be left unset and omitted during serialization. |
||||
// If the list is complete (either because it is not chunking or because this is the last chunk), |
||||
// then there are no more remaining items and this field will be left unset and omitted during |
||||
// serialization. |
||||
// |
||||
// The intended use of the remainingItemCount is *estimating* the size of a collection. Clients |
||||
// should not rely on the remainingItemCount to be set or to be exact. |
||||
// +optional |
||||
int64 remaining_item_count = 4; // 0 won't be set either (no next page token) |
||||
} |
||||
|
||||
message WatchRequest { |
||||
// ResourceVersion of last changes. Empty will default to full history |
||||
int64 since = 1; |
||||
|
||||
// Additional options |
||||
ListOptions options = 3; |
||||
|
||||
// Return initial events |
||||
bool send_initial_events = 4; |
||||
|
||||
// When done with initial events, send a bookmark event |
||||
bool allow_watch_bookmarks = 5; |
||||
} |
||||
|
||||
message WatchEvent { |
||||
enum Type { |
||||
UNKNOWN = 0; |
||||
ADDED = 1; |
||||
MODIFIED = 2; |
||||
DELETED = 3; |
||||
BOOKMARK = 4; |
||||
ERROR = 5; |
||||
} |
||||
|
||||
message Resource { |
||||
int64 version = 1; |
||||
bytes value = 2; |
||||
} |
||||
|
||||
// Timestamp the event was sent |
||||
int64 timestamp = 1; |
||||
|
||||
// Timestamp the event was sent |
||||
Type type = 2; |
||||
|
||||
// Resource version for the object |
||||
Resource resource = 3; |
||||
|
||||
// Previous resource version (for update+delete) |
||||
Resource previous = 4; |
||||
} |
||||
|
||||
message HealthCheckRequest { |
||||
string service = 1; |
||||
} |
||||
|
||||
message HealthCheckResponse { |
||||
enum ServingStatus { |
||||
UNKNOWN = 0; |
||||
SERVING = 1; |
||||
NOT_SERVING = 2; |
||||
SERVICE_UNKNOWN = 3; // Used only by the Watch method. |
||||
} |
||||
ServingStatus status = 1; |
||||
} |
||||
|
||||
|
||||
// This provides the CRUD+List+Watch support needed for a k8s apiserver |
||||
// The semantics and behaviors of this service are constrained by kubernetes |
||||
// This does not understand the resource schemas, only deals with json bytes |
||||
// Clients should not use this interface directly; it is for use in API Servers |
||||
service ResourceStore { |
||||
rpc Read(ReadRequest) returns (ReadResponse); |
||||
rpc Create(CreateRequest) returns (CreateResponse); |
||||
rpc Update(UpdateRequest) returns (UpdateResponse); |
||||
rpc Delete(DeleteRequest) returns (DeleteResponse); |
||||
|
||||
// The results *may* include values that should not be returned to the user |
||||
// This will perform best-effort filtering to increase performace. |
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering |
||||
rpc List(ListRequest) returns (ListResponse); |
||||
|
||||
// The results *may* include values that should not be returned to the user |
||||
// This will perform best-effort filtering to increase performace. |
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering |
||||
rpc Watch(WatchRequest) returns (stream WatchEvent); |
||||
} |
||||
|
||||
// Clients can use this service directly |
||||
// NOTE: This is read only, and no read afer write guarantees |
||||
service Diagnostics { |
||||
// Check if the service is healthy |
||||
rpc IsHealthy(HealthCheckRequest) returns (HealthCheckResponse); |
||||
} |
@ -0,0 +1,445 @@ |
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.4.0
|
||||
// - protoc (unknown)
|
||||
// source: resource.proto
|
||||
|
||||
package resource |
||||
|
||||
import ( |
||||
context "context" |
||||
grpc "google.golang.org/grpc" |
||||
codes "google.golang.org/grpc/codes" |
||||
status "google.golang.org/grpc/status" |
||||
) |
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.62.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion8 |
||||
|
||||
const ( |
||||
ResourceStore_Read_FullMethodName = "/resource.ResourceStore/Read" |
||||
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create" |
||||
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update" |
||||
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete" |
||||
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List" |
||||
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch" |
||||
) |
||||
|
||||
// ResourceStoreClient is the client API for ResourceStore service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
//
|
||||
// This provides the CRUD+List+Watch support needed for a k8s apiserver
|
||||
// The semantics and behaviors of this service are constrained by kubernetes
|
||||
// This does not understand the resource schemas, only deals with json bytes
|
||||
// Clients should not use this interface directly; it is for use in API Servers
|
||||
type ResourceStoreClient interface { |
||||
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) |
||||
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) |
||||
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) |
||||
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) |
||||
// The results *may* include values that should not be returned to the user
|
||||
// This will perform best-effort filtering to increase performace.
|
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering
|
||||
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) |
||||
// The results *may* include values that should not be returned to the user
|
||||
// This will perform best-effort filtering to increase performace.
|
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering
|
||||
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) |
||||
} |
||||
|
||||
type resourceStoreClient struct { |
||||
cc grpc.ClientConnInterface |
||||
} |
||||
|
||||
func NewResourceStoreClient(cc grpc.ClientConnInterface) ResourceStoreClient { |
||||
return &resourceStoreClient{cc} |
||||
} |
||||
|
||||
func (c *resourceStoreClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(ReadResponse) |
||||
err := c.cc.Invoke(ctx, ResourceStore_Read_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (c *resourceStoreClient) Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(CreateResponse) |
||||
err := c.cc.Invoke(ctx, ResourceStore_Create_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (c *resourceStoreClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(UpdateResponse) |
||||
err := c.cc.Invoke(ctx, ResourceStore_Update_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (c *resourceStoreClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(DeleteResponse) |
||||
err := c.cc.Invoke(ctx, ResourceStore_Delete_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(ListResponse) |
||||
err := c.cc.Invoke(ctx, ResourceStore_List_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
stream, err := c.cc.NewStream(ctx, &ResourceStore_ServiceDesc.Streams[0], ResourceStore_Watch_FullMethodName, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
x := &resourceStoreWatchClient{ClientStream: stream} |
||||
if err := x.ClientStream.SendMsg(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if err := x.ClientStream.CloseSend(); err != nil { |
||||
return nil, err |
||||
} |
||||
return x, nil |
||||
} |
||||
|
||||
type ResourceStore_WatchClient interface { |
||||
Recv() (*WatchEvent, error) |
||||
grpc.ClientStream |
||||
} |
||||
|
||||
type resourceStoreWatchClient struct { |
||||
grpc.ClientStream |
||||
} |
||||
|
||||
func (x *resourceStoreWatchClient) Recv() (*WatchEvent, error) { |
||||
m := new(WatchEvent) |
||||
if err := x.ClientStream.RecvMsg(m); err != nil { |
||||
return nil, err |
||||
} |
||||
return m, nil |
||||
} |
||||
|
||||
// ResourceStoreServer is the server API for ResourceStore service.
|
||||
// All implementations should embed UnimplementedResourceStoreServer
|
||||
// for forward compatibility
|
||||
//
|
||||
// This provides the CRUD+List+Watch support needed for a k8s apiserver
|
||||
// The semantics and behaviors of this service are constrained by kubernetes
|
||||
// This does not understand the resource schemas, only deals with json bytes
|
||||
// Clients should not use this interface directly; it is for use in API Servers
|
||||
type ResourceStoreServer interface { |
||||
Read(context.Context, *ReadRequest) (*ReadResponse, error) |
||||
Create(context.Context, *CreateRequest) (*CreateResponse, error) |
||||
Update(context.Context, *UpdateRequest) (*UpdateResponse, error) |
||||
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) |
||||
// The results *may* include values that should not be returned to the user
|
||||
// This will perform best-effort filtering to increase performace.
|
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering
|
||||
List(context.Context, *ListRequest) (*ListResponse, error) |
||||
// The results *may* include values that should not be returned to the user
|
||||
// This will perform best-effort filtering to increase performace.
|
||||
// NOTE: storage.Interface is ultimatly responsible for the final filtering
|
||||
Watch(*WatchRequest, ResourceStore_WatchServer) error |
||||
} |
||||
|
||||
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
|
||||
type UnimplementedResourceStoreServer struct { |
||||
} |
||||
|
||||
func (UnimplementedResourceStoreServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented") |
||||
} |
||||
func (UnimplementedResourceStoreServer) Create(context.Context, *CreateRequest) (*CreateResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Create not implemented") |
||||
} |
||||
func (UnimplementedResourceStoreServer) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Update not implemented") |
||||
} |
||||
func (UnimplementedResourceStoreServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented") |
||||
} |
||||
func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*ListResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method List not implemented") |
||||
} |
||||
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error { |
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented") |
||||
} |
||||
|
||||
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeResourceStoreServer interface { |
||||
mustEmbedUnimplementedResourceStoreServer() |
||||
} |
||||
|
||||
func RegisterResourceStoreServer(s grpc.ServiceRegistrar, srv ResourceStoreServer) { |
||||
s.RegisterService(&ResourceStore_ServiceDesc, srv) |
||||
} |
||||
|
||||
func _ResourceStore_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(ReadRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ResourceStoreServer).Read(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: ResourceStore_Read_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ResourceStoreServer).Read(ctx, req.(*ReadRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
func _ResourceStore_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(CreateRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ResourceStoreServer).Create(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: ResourceStore_Create_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ResourceStoreServer).Create(ctx, req.(*CreateRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
func _ResourceStore_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(UpdateRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ResourceStoreServer).Update(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: ResourceStore_Update_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ResourceStoreServer).Update(ctx, req.(*UpdateRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
func _ResourceStore_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(DeleteRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ResourceStoreServer).Delete(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: ResourceStore_Delete_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ResourceStoreServer).Delete(ctx, req.(*DeleteRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
func _ResourceStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(ListRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ResourceStoreServer).List(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: ResourceStore_List_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ResourceStoreServer).List(ctx, req.(*ListRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
func _ResourceStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { |
||||
m := new(WatchRequest) |
||||
if err := stream.RecvMsg(m); err != nil { |
||||
return err |
||||
} |
||||
return srv.(ResourceStoreServer).Watch(m, &resourceStoreWatchServer{ServerStream: stream}) |
||||
} |
||||
|
||||
type ResourceStore_WatchServer interface { |
||||
Send(*WatchEvent) error |
||||
grpc.ServerStream |
||||
} |
||||
|
||||
type resourceStoreWatchServer struct { |
||||
grpc.ServerStream |
||||
} |
||||
|
||||
func (x *resourceStoreWatchServer) Send(m *WatchEvent) error { |
||||
return x.ServerStream.SendMsg(m) |
||||
} |
||||
|
||||
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ResourceStore_ServiceDesc = grpc.ServiceDesc{ |
||||
ServiceName: "resource.ResourceStore", |
||||
HandlerType: (*ResourceStoreServer)(nil), |
||||
Methods: []grpc.MethodDesc{ |
||||
{ |
||||
MethodName: "Read", |
||||
Handler: _ResourceStore_Read_Handler, |
||||
}, |
||||
{ |
||||
MethodName: "Create", |
||||
Handler: _ResourceStore_Create_Handler, |
||||
}, |
||||
{ |
||||
MethodName: "Update", |
||||
Handler: _ResourceStore_Update_Handler, |
||||
}, |
||||
{ |
||||
MethodName: "Delete", |
||||
Handler: _ResourceStore_Delete_Handler, |
||||
}, |
||||
{ |
||||
MethodName: "List", |
||||
Handler: _ResourceStore_List_Handler, |
||||
}, |
||||
}, |
||||
Streams: []grpc.StreamDesc{ |
||||
{ |
||||
StreamName: "Watch", |
||||
Handler: _ResourceStore_Watch_Handler, |
||||
ServerStreams: true, |
||||
}, |
||||
}, |
||||
Metadata: "resource.proto", |
||||
} |
||||
|
||||
const ( |
||||
Diagnostics_IsHealthy_FullMethodName = "/resource.Diagnostics/IsHealthy" |
||||
) |
||||
|
||||
// DiagnosticsClient is the client API for Diagnostics service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
//
|
||||
// Clients can use this service directly
|
||||
// NOTE: This is read only, and no read afer write guarantees
|
||||
type DiagnosticsClient interface { |
||||
// Check if the service is healthy
|
||||
IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) |
||||
} |
||||
|
||||
type diagnosticsClient struct { |
||||
cc grpc.ClientConnInterface |
||||
} |
||||
|
||||
func NewDiagnosticsClient(cc grpc.ClientConnInterface) DiagnosticsClient { |
||||
return &diagnosticsClient{cc} |
||||
} |
||||
|
||||
func (c *diagnosticsClient) IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { |
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) |
||||
out := new(HealthCheckResponse) |
||||
err := c.cc.Invoke(ctx, Diagnostics_IsHealthy_FullMethodName, in, out, cOpts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
// DiagnosticsServer is the server API for Diagnostics service.
|
||||
// All implementations should embed UnimplementedDiagnosticsServer
|
||||
// for forward compatibility
|
||||
//
|
||||
// Clients can use this service directly
|
||||
// NOTE: This is read only, and no read afer write guarantees
|
||||
type DiagnosticsServer interface { |
||||
// Check if the service is healthy
|
||||
IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) |
||||
} |
||||
|
||||
// UnimplementedDiagnosticsServer should be embedded to have forward compatible implementations.
|
||||
type UnimplementedDiagnosticsServer struct { |
||||
} |
||||
|
||||
func (UnimplementedDiagnosticsServer) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented") |
||||
} |
||||
|
||||
// UnsafeDiagnosticsServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to DiagnosticsServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeDiagnosticsServer interface { |
||||
mustEmbedUnimplementedDiagnosticsServer() |
||||
} |
||||
|
||||
func RegisterDiagnosticsServer(s grpc.ServiceRegistrar, srv DiagnosticsServer) { |
||||
s.RegisterService(&Diagnostics_ServiceDesc, srv) |
||||
} |
||||
|
||||
func _Diagnostics_IsHealthy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(HealthCheckRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(DiagnosticsServer).IsHealthy(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: Diagnostics_IsHealthy_FullMethodName, |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(DiagnosticsServer).IsHealthy(ctx, req.(*HealthCheckRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
// Diagnostics_ServiceDesc is the grpc.ServiceDesc for Diagnostics service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var Diagnostics_ServiceDesc = grpc.ServiceDesc{ |
||||
ServiceName: "resource.Diagnostics", |
||||
HandlerType: (*DiagnosticsServer)(nil), |
||||
Methods: []grpc.MethodDesc{ |
||||
{ |
||||
MethodName: "IsHealthy", |
||||
Handler: _Diagnostics_IsHealthy_Handler, |
||||
}, |
||||
}, |
||||
Streams: []grpc.StreamDesc{}, |
||||
Metadata: "resource.proto", |
||||
} |
@ -0,0 +1,549 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
context "context" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"log/slog" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/google/uuid" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"go.opentelemetry.io/otel/trace/noop" |
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/types" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
) |
||||
|
||||
// Package-level errors.
|
||||
var ( |
||||
ErrNotFound = errors.New("entity not found") |
||||
ErrOptimisticLockingFailed = errors.New("optimistic locking failed") |
||||
ErrUserNotFoundInContext = errors.New("user not found in context") |
||||
ErrUnableToReadResourceJSON = errors.New("unable to read resource json") |
||||
ErrNotImplementedYet = errors.New("not implemented yet") |
||||
) |
||||
|
||||
// ResourceServer implements all services
|
||||
type ResourceServer interface { |
||||
ResourceStoreServer |
||||
DiagnosticsServer |
||||
LifecycleHooks |
||||
} |
||||
|
||||
// The StorageBackend is an internal abstraction that supports interacting with
|
||||
// the underlying raw storage medium. This interface is never exposed directly,
|
||||
// it is provided by concrete instances that actually write values.
|
||||
type StorageBackend interface { |
||||
// Write a Create/Update/Delete,
|
||||
// NOTE: the contents of WriteEvent have been validated
|
||||
// Return the revisionVersion for this event or error
|
||||
WriteEvent(context.Context, WriteEvent) (int64, error) |
||||
|
||||
// Read a value from storage optionally at an explicit version
|
||||
Read(context.Context, *ReadRequest) (*ReadResponse, error) |
||||
|
||||
// When the ResourceServer executes a List request, it will first
|
||||
// query the backend for potential results. All results will be
|
||||
// checked against the kubernetes requirements before finally returning
|
||||
// results. The list options can be used to improve performance
|
||||
// but are the the final answer.
|
||||
PrepareList(context.Context, *ListRequest) (*ListResponse, error) |
||||
|
||||
// Get all events from the store
|
||||
// For HA setups, this will be more events than the local WriteEvent above!
|
||||
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) |
||||
} |
||||
|
||||
type ResourceServerOptions struct { |
||||
// OTel tracer
|
||||
Tracer trace.Tracer |
||||
|
||||
// Real storage backend
|
||||
Backend StorageBackend |
||||
|
||||
// Diagnostics
|
||||
Diagnostics DiagnosticsServer |
||||
|
||||
// Check if a user has access to write folders
|
||||
// When this is nil, no resources can have folders configured
|
||||
WriteAccess WriteAccessHooks |
||||
|
||||
// Callbacks for startup and shutdown
|
||||
Lifecycle LifecycleHooks |
||||
|
||||
// Get the current time in unix millis
|
||||
Now func() int64 |
||||
} |
||||
|
||||
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { |
||||
if opts.Tracer == nil { |
||||
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") |
||||
} |
||||
|
||||
if opts.Backend == nil { |
||||
return nil, fmt.Errorf("missing Backend implementation") |
||||
} |
||||
if opts.Diagnostics == nil { |
||||
opts.Diagnostics = &noopService{} |
||||
} |
||||
if opts.Now == nil { |
||||
opts.Now = func() int64 { |
||||
return time.Now().UnixMilli() |
||||
} |
||||
} |
||||
|
||||
// Make this cancelable
|
||||
ctx, cancel := context.WithCancel(identity.WithRequester(context.Background(), |
||||
&identity.StaticRequester{ |
||||
Namespace: identity.NamespaceServiceAccount, |
||||
Login: "watcher", // admin user for watch
|
||||
UserID: 1, |
||||
IsGrafanaAdmin: true, |
||||
})) |
||||
return &server{ |
||||
tracer: opts.Tracer, |
||||
log: slog.Default().With("logger", "resource-server"), |
||||
backend: opts.Backend, |
||||
diagnostics: opts.Diagnostics, |
||||
access: opts.WriteAccess, |
||||
lifecycle: opts.Lifecycle, |
||||
now: opts.Now, |
||||
ctx: ctx, |
||||
cancel: cancel, |
||||
}, nil |
||||
} |
||||
|
||||
var _ ResourceServer = &server{} |
||||
|
||||
type server struct { |
||||
tracer trace.Tracer |
||||
log *slog.Logger |
||||
backend StorageBackend |
||||
diagnostics DiagnosticsServer |
||||
access WriteAccessHooks |
||||
lifecycle LifecycleHooks |
||||
now func() int64 |
||||
|
||||
// Background watch task -- this has permissions for everything
|
||||
ctx context.Context |
||||
cancel context.CancelFunc |
||||
broadcaster Broadcaster[*WrittenEvent] |
||||
|
||||
// init checking
|
||||
once sync.Once |
||||
initErr error |
||||
} |
||||
|
||||
// Init implements ResourceServer.
|
||||
func (s *server) Init() error { |
||||
s.once.Do(func() { |
||||
// Call lifecycle hooks
|
||||
if s.lifecycle != nil { |
||||
err := s.lifecycle.Init() |
||||
if err != nil { |
||||
s.initErr = fmt.Errorf("initialize Resource Server: %w", err) |
||||
} |
||||
} |
||||
|
||||
// Start watching for changes
|
||||
if s.initErr == nil { |
||||
s.initErr = s.initWatcher() |
||||
} |
||||
|
||||
if s.initErr != nil { |
||||
s.log.Error("error initializing resource server", "error", s.initErr) |
||||
} |
||||
}) |
||||
return s.initErr |
||||
} |
||||
|
||||
func (s *server) Stop() { |
||||
s.initErr = fmt.Errorf("service is stopping") |
||||
|
||||
if s.lifecycle != nil { |
||||
s.lifecycle.Stop() |
||||
} |
||||
|
||||
// Stops the streaming
|
||||
s.cancel() |
||||
|
||||
// mark the value as done
|
||||
s.initErr = fmt.Errorf("service is stopped") |
||||
} |
||||
|
||||
// Old value indicates an update -- otherwise a create
|
||||
func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*writeEventBuilder, error) { |
||||
event, err := newEventFromBytes(value, oldValue) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
event.Key = key |
||||
event.Requester, err = identity.GetRequester(ctx) |
||||
if err != nil { |
||||
return nil, ErrUserNotFoundInContext |
||||
} |
||||
|
||||
obj := event.Meta |
||||
if key.Namespace != obj.GetNamespace() { |
||||
return nil, apierrors.NewBadRequest("key/namespace do not match") |
||||
} |
||||
|
||||
gvk := obj.GetGroupVersionKind() |
||||
if gvk.Kind == "" { |
||||
return nil, apierrors.NewBadRequest("expecting resources with a kind in the body") |
||||
} |
||||
if gvk.Version == "" { |
||||
return nil, apierrors.NewBadRequest("expecting resources with an apiVersion") |
||||
} |
||||
if gvk.Group != "" && gvk.Group != key.Group { |
||||
return nil, apierrors.NewBadRequest( |
||||
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group), |
||||
) |
||||
} |
||||
|
||||
// This needs to be a create function
|
||||
if key.Name == "" { |
||||
if obj.GetName() == "" { |
||||
return nil, apierrors.NewBadRequest("missing name") |
||||
} |
||||
key.Name = obj.GetName() |
||||
} else if key.Name != obj.GetName() { |
||||
return nil, apierrors.NewBadRequest( |
||||
fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName())) |
||||
} |
||||
obj.SetGenerateName("") |
||||
err = validateName(obj.GetName()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
folder := obj.GetFolder() |
||||
if folder != "" { |
||||
err = s.access.CanWriteFolder(ctx, event.Requester, folder) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
origin, err := obj.GetOriginInfo() |
||||
if err != nil { |
||||
return nil, apierrors.NewBadRequest("invalid origin info") |
||||
} |
||||
if origin != nil { |
||||
err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
obj.SetOriginInfo(origin) |
||||
|
||||
// Ensure old values do not mutate things they should not
|
||||
if event.OldMeta != nil { |
||||
old := event.OldMeta |
||||
|
||||
obj.SetUID(old.GetUID()) |
||||
obj.SetCreatedBy(old.GetCreatedBy()) |
||||
obj.SetCreationTimestamp(old.GetCreationTimestamp()) |
||||
} |
||||
return event, nil |
||||
} |
||||
|
||||
func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) { |
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Create") |
||||
defer span.End() |
||||
|
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp := &CreateResponse{} |
||||
builder, err := s.newEventBuilder(ctx, req.Key, req.Value, nil) |
||||
if err != nil { |
||||
rsp.Status, err = errToStatus(err) |
||||
return rsp, err |
||||
} |
||||
|
||||
obj := builder.Meta |
||||
obj.SetCreatedBy(builder.Requester.GetUID().String()) |
||||
obj.SetUpdatedBy("") |
||||
obj.SetUpdatedTimestamp(nil) |
||||
obj.SetCreationTimestamp(metav1.NewTime(time.UnixMilli(s.now()))) |
||||
obj.SetUID(types.UID(uuid.New().String())) |
||||
|
||||
event, err := builder.toEvent() |
||||
if err != nil { |
||||
rsp.Status, err = errToStatus(err) |
||||
return rsp, err |
||||
} |
||||
|
||||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) |
||||
if err == nil { |
||||
rsp.Value = event.Value // with mutated fields
|
||||
} else { |
||||
rsp.Status, err = errToStatus(err) |
||||
} |
||||
return rsp, err |
||||
} |
||||
|
||||
// Convert golang errors to status result errors that can be returned to a client
|
||||
func errToStatus(err error) (*StatusResult, error) { |
||||
if err != nil { |
||||
apistatus, ok := err.(apierrors.APIStatus) |
||||
if ok { |
||||
s := apistatus.Status() |
||||
return &StatusResult{ |
||||
Status: s.Status, |
||||
Message: s.Message, |
||||
Reason: string(s.Reason), |
||||
Code: s.Code, |
||||
}, nil |
||||
} |
||||
|
||||
// TODO... better conversion!!!
|
||||
return &StatusResult{ |
||||
Status: "Failure", |
||||
Message: err.Error(), |
||||
Code: 500, |
||||
}, nil |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) { |
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Update") |
||||
defer span.End() |
||||
|
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp := &UpdateResponse{} |
||||
if req.ResourceVersion < 0 { |
||||
rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version")) |
||||
return rsp, nil |
||||
} |
||||
|
||||
latest, err := s.backend.Read(ctx, &ReadRequest{ |
||||
Key: req.Key, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if latest.Value == nil { |
||||
return nil, apierrors.NewBadRequest("current value does not exist") |
||||
} |
||||
|
||||
builder, err := s.newEventBuilder(ctx, req.Key, req.Value, latest.Value) |
||||
if err != nil { |
||||
rsp.Status, err = errToStatus(err) |
||||
return rsp, err |
||||
} |
||||
|
||||
obj := builder.Meta |
||||
obj.SetUpdatedBy(builder.Requester.GetUID().String()) |
||||
obj.SetUpdatedTimestampMillis(time.Now().UnixMilli()) |
||||
|
||||
event, err := builder.toEvent() |
||||
if err != nil { |
||||
rsp.Status, err = errToStatus(err) |
||||
return rsp, err |
||||
} |
||||
|
||||
event.Type = WatchEvent_MODIFIED |
||||
event.PreviousRV = latest.ResourceVersion |
||||
|
||||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) |
||||
rsp.Status, err = errToStatus(err) |
||||
if err == nil { |
||||
rsp.Value = event.Value // with mutated fields
|
||||
} else { |
||||
rsp.Status, err = errToStatus(err) |
||||
} |
||||
return rsp, err |
||||
} |
||||
|
||||
func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { |
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Delete") |
||||
defer span.End() |
||||
|
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp := &DeleteResponse{} |
||||
if req.ResourceVersion < 0 { |
||||
return nil, apierrors.NewBadRequest("update must include the previous version") |
||||
} |
||||
|
||||
latest, err := s.backend.Read(ctx, &ReadRequest{ |
||||
Key: req.Key, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion { |
||||
return nil, ErrOptimisticLockingFailed |
||||
} |
||||
|
||||
now := metav1.NewTime(time.UnixMilli(s.now())) |
||||
event := WriteEvent{ |
||||
Key: req.Key, |
||||
Type: WatchEvent_DELETED, |
||||
PreviousRV: latest.ResourceVersion, |
||||
} |
||||
requester, err := identity.GetRequester(ctx) |
||||
if err != nil { |
||||
return nil, apierrors.NewBadRequest("unable to get user") |
||||
} |
||||
marker := &DeletedMarker{} |
||||
err = json.Unmarshal(latest.Value, marker) |
||||
if err != nil { |
||||
return nil, apierrors.NewBadRequest( |
||||
fmt.Sprintf("unable to read previous object, %v", err)) |
||||
} |
||||
obj, err := utils.MetaAccessor(marker) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
obj.SetDeletionTimestamp(&now) |
||||
obj.SetUpdatedTimestamp(&now.Time) |
||||
obj.SetManagedFields(nil) |
||||
obj.SetFinalizers(nil) |
||||
obj.SetUpdatedBy(requester.GetUID().String()) |
||||
marker.TypeMeta = metav1.TypeMeta{ |
||||
Kind: "DeletedMarker", |
||||
APIVersion: "common.grafana.app/v0alpha1", // ?? or can we stick this in common?
|
||||
} |
||||
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV) |
||||
event.Value, err = json.Marshal(marker) |
||||
if err != nil { |
||||
return nil, apierrors.NewBadRequest( |
||||
fmt.Sprintf("unable creating deletion marker, %v", err)) |
||||
} |
||||
|
||||
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) |
||||
rsp.Status, err = errToStatus(err) |
||||
return rsp, err |
||||
} |
||||
|
||||
func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { |
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// if req.Key.Group == "" {
|
||||
// status, _ := errToStatus(apierrors.NewBadRequest("missing group"))
|
||||
// return &ReadResponse{Status: status}, nil
|
||||
// }
|
||||
if req.Key.Resource == "" { |
||||
status, _ := errToStatus(apierrors.NewBadRequest("missing resource")) |
||||
return &ReadResponse{Status: status}, nil |
||||
} |
||||
|
||||
rsp, err := s.backend.Read(ctx, req) |
||||
if err != nil { |
||||
if rsp == nil { |
||||
rsp = &ReadResponse{} |
||||
} |
||||
rsp.Status, err = errToStatus(err) |
||||
} |
||||
return rsp, err |
||||
} |
||||
|
||||
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { |
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
rsp, err := s.backend.PrepareList(ctx, req) |
||||
// Status???
|
||||
return rsp, err |
||||
} |
||||
|
||||
func (s *server) initWatcher() error { |
||||
var err error |
||||
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error { |
||||
events, err := s.backend.WatchWriteEvents(s.ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
go func() { |
||||
for { |
||||
// pipe all events
|
||||
v := <-events |
||||
out <- v |
||||
} |
||||
}() |
||||
return nil |
||||
}) |
||||
return err |
||||
} |
||||
|
||||
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { |
||||
if err := s.Init(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
ctx := srv.Context() |
||||
|
||||
// Start listening -- this will buffer any changes that happen while we backfill
|
||||
stream, err := s.broadcaster.Subscribe(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer s.broadcaster.Unsubscribe(stream) |
||||
|
||||
since := req.Since |
||||
if req.SendInitialEvents { |
||||
fmt.Printf("TODO... query\n") |
||||
// All initial events are CREATE
|
||||
|
||||
if req.AllowWatchBookmarks { |
||||
fmt.Printf("TODO... send bookmark\n") |
||||
} |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil |
||||
|
||||
case event, ok := <-stream: |
||||
if !ok { |
||||
s.log.Debug("watch events closed") |
||||
return nil |
||||
} |
||||
|
||||
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) { |
||||
// Currently sending *every* event
|
||||
// if req.Options.Labels != nil {
|
||||
// // match *either* the old or new object
|
||||
// }
|
||||
// TODO: return values that match either the old or the new
|
||||
|
||||
srv.Send(&WatchEvent{ |
||||
Timestamp: event.Timestamp, |
||||
Type: event.Type, |
||||
Resource: &WatchEvent_Resource{ |
||||
Value: event.Value, |
||||
Version: event.ResourceVersion, |
||||
}, |
||||
// TODO... previous???
|
||||
}) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// IsHealthy implements ResourceServer.
|
||||
func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) { |
||||
if err := s.Init(); err != nil { |
||||
return nil, err |
||||
} |
||||
return s.diagnostics.IsHealthy(ctx, req) |
||||
} |
@ -0,0 +1,167 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"os" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
"gocloud.dev/blob/fileblob" |
||||
"gocloud.dev/blob/memblob" |
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
||||
) |
||||
|
||||
func TestSimpleServer(t *testing.T) { |
||||
testUserA := &identity.StaticRequester{ |
||||
Namespace: identity.NamespaceUser, |
||||
Login: "testuser", |
||||
UserID: 123, |
||||
UserUID: "u123", |
||||
OrgRole: identity.RoleAdmin, |
||||
IsGrafanaAdmin: true, // can do anything
|
||||
} |
||||
ctx := identity.WithRequester(context.Background(), testUserA) |
||||
|
||||
bucket := memblob.OpenBucket(nil) |
||||
if false { |
||||
tmp, err := os.MkdirTemp("", "xxx-*") |
||||
require.NoError(t, err) |
||||
|
||||
bucket, err = fileblob.OpenBucket(tmp, &fileblob.Options{ |
||||
CreateDir: true, |
||||
Metadata: fileblob.MetadataDontWrite, // skip
|
||||
}) |
||||
require.NoError(t, err) |
||||
fmt.Printf("ROOT: %s\n\n", tmp) |
||||
} |
||||
store, err := NewCDKBackend(ctx, CDKBackendOptions{ |
||||
Bucket: bucket, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
server, err := NewResourceServer(ResourceServerOptions{ |
||||
Backend: store, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
t.Run("playlist happy CRUD paths", func(t *testing.T) { |
||||
raw := []byte(`{ |
||||
"apiVersion": "playlist.grafana.app/v0alpha1", |
||||
"kind": "Playlist", |
||||
"metadata": { |
||||
"name": "fdgsv37qslr0ga", |
||||
"namespace": "default", |
||||
"annotations": { |
||||
"grafana.app/originName": "elsewhere", |
||||
"grafana.app/originPath": "path/to/item", |
||||
"grafana.app/originTimestamp": "2024-02-02T00:00:00Z" |
||||
} |
||||
}, |
||||
"spec": { |
||||
"title": "hello", |
||||
"interval": "5m", |
||||
"items": [ |
||||
{ |
||||
"type": "dashboard_by_uid", |
||||
"value": "vmie2cmWz" |
||||
} |
||||
] |
||||
} |
||||
}`) |
||||
|
||||
key := &ResourceKey{ |
||||
Group: "playlist.grafana.app", |
||||
Resource: "rrrr", // can be anything :(
|
||||
Namespace: "default", |
||||
Name: "fdgsv37qslr0ga", |
||||
} |
||||
|
||||
// Should be empty when we start
|
||||
all, err := server.List(ctx, &ListRequest{Options: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: key.Group, |
||||
Resource: key.Resource, |
||||
}, |
||||
}}) |
||||
require.NoError(t, err) |
||||
require.Len(t, all.Items, 0) |
||||
|
||||
created, err := server.Create(ctx, &CreateRequest{ |
||||
Value: raw, |
||||
Key: key, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.Nil(t, created.Status) |
||||
require.True(t, created.ResourceVersion > 0) |
||||
|
||||
// The key does not include resource version
|
||||
found, err := server.Read(ctx, &ReadRequest{Key: key}) |
||||
require.NoError(t, err) |
||||
require.Nil(t, found.Status) |
||||
require.Equal(t, created.ResourceVersion, found.ResourceVersion) |
||||
|
||||
// Now update the value
|
||||
tmp := &unstructured.Unstructured{} |
||||
err = json.Unmarshal(created.Value, tmp) |
||||
require.NoError(t, err) |
||||
|
||||
now := time.Now().UnixMilli() |
||||
obj, err := utils.MetaAccessor(tmp) |
||||
require.NoError(t, err) |
||||
obj.SetAnnotation("test", "hello") |
||||
obj.SetUpdatedTimestampMillis(now) |
||||
obj.SetUpdatedBy(testUserA.GetUID().String()) |
||||
raw, err = json.Marshal(tmp) |
||||
require.NoError(t, err) |
||||
|
||||
updated, err := server.Update(ctx, &UpdateRequest{ |
||||
Key: key, |
||||
Value: raw, |
||||
ResourceVersion: created.ResourceVersion}) |
||||
require.NoError(t, err) |
||||
require.Nil(t, updated.Status) |
||||
require.True(t, updated.ResourceVersion > created.ResourceVersion) |
||||
|
||||
// We should still get the latest
|
||||
found, err = server.Read(ctx, &ReadRequest{Key: key}) |
||||
require.NoError(t, err) |
||||
require.Nil(t, found.Status) |
||||
require.Equal(t, updated.ResourceVersion, found.ResourceVersion) |
||||
|
||||
all, err = server.List(ctx, &ListRequest{Options: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: key.Group, |
||||
Resource: key.Resource, |
||||
}, |
||||
}}) |
||||
require.NoError(t, err) |
||||
require.Len(t, all.Items, 1) |
||||
require.Equal(t, updated.ResourceVersion, all.Items[0].ResourceVersion) |
||||
|
||||
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key, ResourceVersion: updated.ResourceVersion}) |
||||
require.NoError(t, err) |
||||
require.True(t, deleted.ResourceVersion > updated.ResourceVersion) |
||||
|
||||
// We should get not found status when trying to read the latest value
|
||||
found, err = server.Read(ctx, &ReadRequest{Key: key}) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, found.Status) |
||||
require.Equal(t, int32(404), found.Status.Code) |
||||
|
||||
// And the deleted value should not be in the results
|
||||
all, err = server.List(ctx, &ListRequest{Options: &ListOptions{ |
||||
Key: &ResourceKey{ |
||||
Group: key.Group, |
||||
Resource: key.Resource, |
||||
}, |
||||
}}) |
||||
require.NoError(t, err) |
||||
require.Len(t, all.Items, 0) // empty
|
||||
}) |
||||
} |
@ -0,0 +1,25 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"fmt" |
||||
"regexp" |
||||
) |
||||
|
||||
var validNameCharPattern = `a-zA-Z0-9\-\_` |
||||
var validNamePattern = regexp.MustCompile(`^[` + validNameCharPattern + `]*$`).MatchString |
||||
|
||||
func validateName(name string) error { |
||||
if len(name) < 2 { |
||||
return fmt.Errorf("name is too short") |
||||
} |
||||
if len(name) > 64 { |
||||
return fmt.Errorf("name is too long") |
||||
} |
||||
if !validNamePattern(name) { |
||||
return fmt.Errorf("name includes invalid characters") |
||||
} |
||||
// In standard k8s, it must not start with a number
|
||||
// however that would force us to update many many many existing resources
|
||||
// so we will be slightly more lenient than standard k8s
|
||||
return nil |
||||
} |
Loading…
Reference in new issue