|
|
@ -23,11 +23,12 @@ import ( |
|
|
|
"k8s.io/apimachinery/pkg/runtime/schema" |
|
|
|
"k8s.io/apimachinery/pkg/runtime/schema" |
|
|
|
"k8s.io/apimachinery/pkg/selection" |
|
|
|
"k8s.io/apimachinery/pkg/selection" |
|
|
|
"k8s.io/apimachinery/pkg/watch" |
|
|
|
"k8s.io/apimachinery/pkg/watch" |
|
|
|
"k8s.io/apiserver/pkg/endpoints/request" |
|
|
|
|
|
|
|
"k8s.io/apiserver/pkg/storage" |
|
|
|
"k8s.io/apiserver/pkg/storage" |
|
|
|
"k8s.io/apiserver/pkg/storage/storagebackend" |
|
|
|
"k8s.io/apiserver/pkg/storage/storagebackend" |
|
|
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
|
|
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/utils" |
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/utils" |
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
|
|
) |
|
|
|
) |
|
|
@ -84,31 +85,30 @@ func errorWrap(status *resource.StatusResult) error { |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func getKey(ctx context.Context) (*resource.ResourceKey, error) { |
|
|
|
func getKey(val string) (*resource.ResourceKey, error) { |
|
|
|
requestInfo, ok := request.RequestInfoFrom(ctx) |
|
|
|
k, err := grafanaregistry.ParseKey(val) |
|
|
|
if !ok { |
|
|
|
if err != nil { |
|
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("could not get request info")) |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
|
|
|
|
key := &resource.ResourceKey{ |
|
|
|
|
|
|
|
Group: requestInfo.APIGroup, |
|
|
|
|
|
|
|
Resource: requestInfo.Resource, |
|
|
|
|
|
|
|
Namespace: requestInfo.Namespace, |
|
|
|
|
|
|
|
Name: requestInfo.Name, |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
if key.Group == "" { |
|
|
|
if k.Group == "" { |
|
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("missing group in request")) |
|
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("missing group in request")) |
|
|
|
} |
|
|
|
} |
|
|
|
if key.Resource == "" { |
|
|
|
if k.Resource == "" { |
|
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("missing resource in request")) |
|
|
|
return nil, apierrors.NewInternalError(fmt.Errorf("missing resource in request")) |
|
|
|
} |
|
|
|
} |
|
|
|
return key, nil |
|
|
|
return &resource.ResourceKey{ |
|
|
|
|
|
|
|
Namespace: k.Namespace, |
|
|
|
|
|
|
|
Group: k.Group, |
|
|
|
|
|
|
|
Resource: k.Resource, |
|
|
|
|
|
|
|
Name: k.Name, |
|
|
|
|
|
|
|
}, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
|
|
|
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
|
|
|
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
|
|
|
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
|
|
|
// set to the read value from database.
|
|
|
|
// set to the read value from database.
|
|
|
|
func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out runtime.Object, ttl uint64) error { |
|
|
|
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error { |
|
|
|
key, err := getKey(ctx) |
|
|
|
k, err := getKey(key) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -125,7 +125,7 @@ func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
cmd := &resource.CreateRequest{ |
|
|
|
cmd := &resource.CreateRequest{ |
|
|
|
Key: key, |
|
|
|
Key: k, |
|
|
|
Value: buf.Bytes(), |
|
|
|
Value: buf.Bytes(), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -162,8 +162,8 @@ func (s *Storage) Create(ctx context.Context, _ string, obj runtime.Object, out |
|
|
|
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
|
|
|
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
|
|
|
// current version of the object to avoid read operation from storage to get it.
|
|
|
|
// current version of the object to avoid read operation from storage to get it.
|
|
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
|
|
func (s *Storage) Delete(ctx context.Context, _ string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { |
|
|
|
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { |
|
|
|
key, err := getKey(ctx) |
|
|
|
k, err := getKey(key) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -172,7 +172,7 @@ func (s *Storage) Delete(ctx context.Context, _ string, out runtime.Object, prec |
|
|
|
return fmt.Errorf("not supported (validate deletion)") |
|
|
|
return fmt.Errorf("not supported (validate deletion)") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
cmd := &resource.DeleteRequest{Key: key} |
|
|
|
cmd := &resource.DeleteRequest{Key: k} |
|
|
|
if preconditions != nil { |
|
|
|
if preconditions != nil { |
|
|
|
if preconditions.ResourceVersion != nil { |
|
|
|
if preconditions.ResourceVersion != nil { |
|
|
|
cmd.ResourceVersion, err = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64) |
|
|
|
cmd.ResourceVersion, err = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64) |
|
|
@ -203,8 +203,8 @@ func (s *Storage) Delete(ctx context.Context, _ string, out runtime.Object, prec |
|
|
|
// (e.g. reconnecting without missing any updates).
|
|
|
|
// (e.g. reconnecting without missing any updates).
|
|
|
|
// If resource version is "0", this interface will get current object at given key
|
|
|
|
// If resource version is "0", this interface will get current object at given key
|
|
|
|
// and send it in an "ADDED" event, before watch starts.
|
|
|
|
// and send it in an "ADDED" event, before watch starts.
|
|
|
|
func (s *Storage) Watch(ctx context.Context, _ string, opts storage.ListOptions) (watch.Interface, error) { |
|
|
|
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { |
|
|
|
listopts, _, err := toListRequest(ctx, opts) |
|
|
|
listopts, _, err := toListRequest(key, opts) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
} |
|
|
@ -247,10 +247,10 @@ func (s *Storage) Watch(ctx context.Context, _ string, opts storage.ListOptions) |
|
|
|
// Treats empty responses and nil response nodes exactly like a not found error.
|
|
|
|
// Treats empty responses and nil response nodes exactly like a not found error.
|
|
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
|
|
func (s *Storage) Get(ctx context.Context, _ string, opts storage.GetOptions, objPtr runtime.Object) error { |
|
|
|
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { |
|
|
|
var err error |
|
|
|
var err error |
|
|
|
req := &resource.ReadRequest{} |
|
|
|
req := &resource.ReadRequest{} |
|
|
|
req.Key, err = getKey(ctx) |
|
|
|
req.Key, err = getKey(key) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -283,16 +283,16 @@ func (s *Storage) Get(ctx context.Context, _ string, opts storage.GetOptions, ob |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func toListRequest(ctx context.Context, opts storage.ListOptions) (*resource.ListRequest, storage.SelectionPredicate, error) { |
|
|
|
func toListRequest(key string, opts storage.ListOptions) (*resource.ListRequest, storage.SelectionPredicate, error) { |
|
|
|
predicate := opts.Predicate |
|
|
|
predicate := opts.Predicate |
|
|
|
key, err := getKey(ctx) |
|
|
|
k, err := getKey(key) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return nil, predicate, err |
|
|
|
return nil, predicate, err |
|
|
|
} |
|
|
|
} |
|
|
|
req := &resource.ListRequest{ |
|
|
|
req := &resource.ListRequest{ |
|
|
|
Limit: opts.Predicate.Limit, |
|
|
|
Limit: opts.Predicate.Limit, |
|
|
|
Options: &resource.ListOptions{ |
|
|
|
Options: &resource.ListOptions{ |
|
|
|
Key: key, |
|
|
|
Key: k, |
|
|
|
}, |
|
|
|
}, |
|
|
|
NextPageToken: predicate.Continue, |
|
|
|
NextPageToken: predicate.Continue, |
|
|
|
} |
|
|
|
} |
|
|
@ -364,8 +364,8 @@ func toListRequest(ctx context.Context, opts storage.ListOptions) (*resource.Lis |
|
|
|
// is true, 'key' is used as a prefix.
|
|
|
|
// is true, 'key' is used as a prefix.
|
|
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
|
|
// The returned contents may be delayed, but it is guaranteed that they will
|
|
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
|
|
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
|
|
|
func (s *Storage) GetList(ctx context.Context, _ string, opts storage.ListOptions, listObj runtime.Object) error { |
|
|
|
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { |
|
|
|
req, predicate, err := toListRequest(ctx, opts) |
|
|
|
req, predicate, err := toListRequest(key, opts) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -440,20 +440,20 @@ func (s *Storage) GetList(ctx context.Context, _ string, opts storage.ListOption |
|
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
|
|
// However, the implementations have to retry in case suggestion is stale.
|
|
|
|
func (s *Storage) GuaranteedUpdate( |
|
|
|
func (s *Storage) GuaranteedUpdate( |
|
|
|
ctx context.Context, |
|
|
|
ctx context.Context, |
|
|
|
_ string, |
|
|
|
key string, |
|
|
|
destination runtime.Object, |
|
|
|
destination runtime.Object, |
|
|
|
ignoreNotFound bool, |
|
|
|
ignoreNotFound bool, |
|
|
|
preconditions *storage.Preconditions, |
|
|
|
preconditions *storage.Preconditions, |
|
|
|
tryUpdate storage.UpdateFunc, |
|
|
|
tryUpdate storage.UpdateFunc, |
|
|
|
cachedExistingObject runtime.Object, |
|
|
|
cachedExistingObject runtime.Object, |
|
|
|
) error { |
|
|
|
) error { |
|
|
|
key, err := getKey(ctx) |
|
|
|
k, err := getKey(key) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Get the current version
|
|
|
|
// Get the current version
|
|
|
|
err = s.Get(ctx, "<ignored>", storage.GetOptions{}, destination) |
|
|
|
err = s.Get(ctx, key, storage.GetOptions{}, destination) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
if ignoreNotFound && apierrors.IsNotFound(err) { |
|
|
|
if ignoreNotFound && apierrors.IsNotFound(err) { |
|
|
|
// destination is already set to zero value
|
|
|
|
// destination is already set to zero value
|
|
|
@ -499,7 +499,7 @@ func (s *Storage) GuaranteedUpdate( |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return apierrors.NewInternalError( |
|
|
|
return apierrors.NewInternalError( |
|
|
|
fmt.Errorf("could not successfully update object. key=%s, err=%s", key.String(), err.Error()), |
|
|
|
fmt.Errorf("could not successfully update object. key=%s, err=%s", k.String(), err.Error()), |
|
|
|
) |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -509,8 +509,7 @@ func (s *Storage) GuaranteedUpdate( |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
req := &resource.UpdateRequest{Key: key, Value: buf.Bytes()} |
|
|
|
req := &resource.UpdateRequest{Key: k, Value: buf.Bytes()} |
|
|
|
// TODO... message
|
|
|
|
|
|
|
|
rsp, err := s.store.Update(ctx, req) |
|
|
|
rsp, err := s.store.Update(ctx, req) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|