not rest.Storage

pull/89891/head
Ryan McKinley 1 year ago
parent cfc192a2b5
commit f68390bebf
  1. 351
      pkg/registry/apis/dashboard/legacy_storage.go
  2. 11
      pkg/registry/apis/dashboard/register.go

@ -2,169 +2,314 @@ package dashboard
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strings" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1" common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
"github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" "github.com/grafana/grafana/pkg/apimachinery/utils"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/access" "github.com/grafana/grafana/pkg/registry/apis/dashboard/access"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" "github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/apiserver/storage/entity" "github.com/grafana/grafana/pkg/storage/unified/resource"
) )
var ( var (
_ rest.Storage = (*dashboardStorage)(nil) _ resource.AppendingStore = (*dashboardStorage)(nil)
_ rest.Scoper = (*dashboardStorage)(nil) _ resource.BlobStore = (*dashboardStorage)(nil)
_ rest.SingularNameProvider = (*dashboardStorage)(nil)
_ rest.Getter = (*dashboardStorage)(nil)
_ rest.Lister = (*dashboardStorage)(nil)
_ rest.Creater = (*dashboardStorage)(nil)
_ rest.Updater = (*dashboardStorage)(nil)
_ rest.GracefulDeleter = (*dashboardStorage)(nil)
) )
type dashboardStorage struct { type dashboardStorage struct {
resource common.ResourceInfo resource common.ResourceInfo
access access.DashboardAccess access access.DashboardAccess
tableConverter rest.TableConvertor tableConverter rest.TableConvertor
}
func (s *dashboardStorage) New() runtime.Object { // Typically one... the server wrapper
return s.resource.NewFunc() subscribers []chan *resource.WrittenEvent
mutex sync.Mutex
} }
func (s *dashboardStorage) Destroy() {} // func (s *dashboardStorage) Create(ctx context.Context,
// obj runtime.Object,
// createValidation rest.ValidateObjectFunc,
// options *metav1.CreateOptions,
// ) (runtime.Object, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, err
// }
func (s *dashboardStorage) NamespaceScoped() bool { // p, ok := obj.(*v0alpha1.Dashboard)
return true // if !ok {
} // return nil, fmt.Errorf("expected dashboard?")
// }
// // HACK to simplify unique name testing from kubectl
// t := p.Spec.GetNestedString("title")
// if strings.Contains(t, "${NOW}") {
// t = strings.ReplaceAll(t, "${NOW}", fmt.Sprintf("%d", time.Now().Unix()))
// p.Spec.Set("title", t)
// }
// dash, _, err := s.access.SaveDashboard(ctx, info.OrgID, p)
// return dash, err
// }
// func (s *dashboardStorage) Update(ctx context.Context,
// name string,
// objInfo rest.UpdatedObjectInfo,
// createValidation rest.ValidateObjectFunc,
// updateValidation rest.ValidateObjectUpdateFunc,
// forceAllowCreate bool,
// options *metav1.UpdateOptions,
// ) (runtime.Object, bool, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, false, err
// }
// created := false
// old, err := s.Get(ctx, name, nil)
// if err != nil {
// return old, created, err
// }
// obj, err := objInfo.UpdatedObject(ctx, old)
// if err != nil {
// return old, created, err
// }
// p, ok := obj.(*v0alpha1.Dashboard)
// if !ok {
// return nil, created, fmt.Errorf("expected dashboard after update")
// }
// _, created, err = s.access.SaveDashboard(ctx, info.OrgID, p)
// if err == nil {
// r, err := s.Get(ctx, name, nil)
// return r, created, err
// }
// return nil, created, err
// }
// // GracefulDeleter
// func (s *dashboardStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, false, err
// }
// return s.access.DeleteDashboard(ctx, info.OrgID, name)
// }
// func (s *dashboardStorage) ListXX(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
// orgId, err := request.OrgIDForList(ctx)
// if err != nil {
// return nil, err
// }
// // fmt.Printf("LIST: %s\n", options.Continue)
// // translate grafana.app/* label selectors into field requirements
// requirements, newSelector, err := entity.ReadLabelSelectors(options.LabelSelector)
// if err != nil {
// return nil, err
// }
// query := &access.DashboardQuery{
// OrgID: orgId,
// Limit: int(options.Limit),
// MaxBytes: 2 * 1024 * 1024, // 2MB,
// ContinueToken: options.Continue,
// Requirements: requirements,
// Labels: newSelector,
// }
func (s *dashboardStorage) GetSingularName() string { // return s.access.GetDashboards(ctx, query)
return s.resource.GetSingularName() // }
func (s *dashboardStorage) SupportsSignedURLs() bool {
return false
} }
func (s *dashboardStorage) NewList() runtime.Object { func (s *dashboardStorage) PutBlob(context.Context, *resource.PutBlobRequest) (*resource.PutBlobResponse, error) {
return s.resource.NewListFunc() return nil, fmt.Errorf("not implemented yet")
} }
func (s *dashboardStorage) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { func (s *dashboardStorage) GetBlob(ctx context.Context, resource *resource.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resource.GetBlobResponse, error) {
return s.tableConverter.ConvertToTable(ctx, object, tableOptions) return nil, fmt.Errorf("not implemented yet")
} }
func (s *dashboardStorage) Create(ctx context.Context, func getDashbaord(event resource.WriteEvent) (*dashboard.Dashboard, error) {
obj runtime.Object, obj, ok := event.Object.GetRuntimeObject()
createValidation rest.ValidateObjectFunc, if ok && obj != nil {
options *metav1.CreateOptions, dash, ok := obj.(*dashboard.Dashboard)
) (runtime.Object, error) { if ok {
info, err := request.NamespaceInfoFrom(ctx, true) return dash, nil
if err != nil { }
return nil, err
} }
dash := &dashboard.Dashboard{}
err := json.Unmarshal(event.Value, dash)
return dash, err
}
p, ok := obj.(*v0alpha1.Dashboard) func isDashboardKey(key *resource.ResourceKey, requireName bool) error {
if !ok { gr := dashboard.DashboardResourceInfo.GroupResource()
return nil, fmt.Errorf("expected dashboard?") if key.Group != gr.Group {
return fmt.Errorf("expecting dashboard group")
} }
if key.Resource != gr.Resource {
// HACK to simplify unique name testing from kubectl return fmt.Errorf("expecting dashboard resource")
t := p.Spec.GetNestedString("title")
if strings.Contains(t, "${NOW}") {
t = strings.ReplaceAll(t, "${NOW}", fmt.Sprintf("%d", time.Now().Unix()))
p.Spec.Set("title", t)
} }
if requireName && key.Name == "" {
dash, _, err := s.access.SaveDashboard(ctx, info.OrgID, p) return fmt.Errorf("expecting dashboard name (uid)")
return dash, err }
return nil
} }
func (s *dashboardStorage) Update(ctx context.Context, func (s *dashboardStorage) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) {
name string, info, err := request.ParseNamespace(event.Key.Namespace)
objInfo rest.UpdatedObjectInfo, if err == nil {
createValidation rest.ValidateObjectFunc, err = isDashboardKey(event.Key, true)
updateValidation rest.ValidateObjectUpdateFunc,
forceAllowCreate bool,
options *metav1.UpdateOptions,
) (runtime.Object, bool, error) {
info, err := request.NamespaceInfoFrom(ctx, true)
if err != nil {
return nil, false, err
} }
created := false
old, err := s.Get(ctx, name, nil)
if err != nil { if err != nil {
return old, created, err return 0, err
} }
obj, err := objInfo.UpdatedObject(ctx, old) switch event.Type {
if err != nil { case resource.WatchEvent_DELETED:
return old, created, err {
} _, _, err = s.access.DeleteDashboard(ctx, info.OrgID, event.Key.Name)
p, ok := obj.(*v0alpha1.Dashboard) rv = event.EventID
if !ok { }
return nil, created, fmt.Errorf("expected dashboard after update") // The difference depends on embedded internal ID
case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED:
{
dash, err := getDashbaord(event)
if err != nil {
return 0, err
}
after, _, err := s.access.SaveDashboard(ctx, info.OrgID, dash)
if err != nil {
return 0, err
}
if after != nil {
meta, err := utils.MetaAccessor(after)
if err != nil {
return 0, err
}
rv, err = meta.GetResourceVersionInt64()
}
}
default:
return 0, fmt.Errorf("unsupported event type: %v", event.Type)
} }
_, created, err = s.access.SaveDashboard(ctx, info.OrgID, p) // Async notify all subscribers (not HA!!!)
if err == nil { if s.subscribers != nil {
r, err := s.Get(ctx, name, nil) go func() {
return r, created, err write := &resource.WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
for _, sub := range s.subscribers {
sub <- write
}
}()
} }
return nil, created, err return rv, err
} }
// GracefulDeleter // Read implements ResourceStoreServer.
func (s *dashboardStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { func (s *dashboardStorage) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
info, err := request.NamespaceInfoFrom(ctx, true) info, err := request.ParseNamespace(req.Key.Namespace)
if err == nil {
err = isDashboardKey(req.Key, true)
}
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
if req.ResourceVersion > 0 {
return s.access.DeleteDashboard(ctx, info.OrgID, name) return nil, fmt.Errorf("reading from history not yet supported")
} }
dash, err := s.access.GetDashboard(ctx, info.OrgID, req.Key.Name)
func (s *dashboardStorage) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
orgId, err := request.OrgIDForList(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
meta, err := utils.MetaAccessor(dash)
if err != nil {
return nil, err
}
rv, err := meta.GetResourceVersionInt64()
if err != nil {
return nil, err
}
value, err := json.Marshal(dash)
return &resource.ReadResponse{
ResourceVersion: rv,
Value: value,
}, err
}
// fmt.Printf("LIST: %s\n", options.Continue) // List implements AppendingStore.
func (s *dashboardStorage) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
// translate grafana.app/* label selectors into field requirements opts := req.Options
requirements, newSelector, err := entity.ReadLabelSelectors(options.LabelSelector) info, err := request.ParseNamespace(opts.Key.Namespace)
if err == nil {
err = isDashboardKey(opts.Key, false)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
query := &access.DashboardQuery{ query := &access.DashboardQuery{
OrgID: orgId, OrgID: info.OrgID,
Limit: int(options.Limit), Limit: int(req.Limit),
MaxBytes: 2 * 1024 * 1024, // 2MB, MaxBytes: 2 * 1024 * 1024, // 2MB,
ContinueToken: options.Continue, ContinueToken: req.NextPageToken,
Requirements: requirements, // Requirements: requirements,
Labels: newSelector, // Labels: newSelector,
} }
fmt.Printf("%+v\n", query)
return s.access.GetDashboards(ctx, query) // return s.access.GetDashboards(ctx, query)
return nil, fmt.Errorf("todo")
} }
func (s *dashboardStorage) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { // Watch implements AppendingStore.
info, err := request.NamespaceInfoFrom(ctx, true) func (s *dashboardStorage) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
if err != nil { stream := make(chan *resource.WrittenEvent, 10)
return nil, err {
s.mutex.Lock()
defer s.mutex.Unlock()
// Add the event stream
s.subscribers = append(s.subscribers, stream)
} }
return s.access.GetDashboard(ctx, info.OrgID, name) // Wait for context done
} go func() {
// Wait till the context is done
<-ctx.Done()
// Then remove the subscription
s.mutex.Lock()
defer s.mutex.Unlock()
// GracefulDeleter // Copy all streams without our listener
func (s *dashboardStorage) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *internalversion.ListOptions) (runtime.Object, error) { subs := []chan *resource.WrittenEvent{}
return nil, fmt.Errorf("DeleteCollection for dashboards not implemented") for _, sub := range s.subscribers {
if sub != stream {
subs = append(subs, sub)
}
}
s.subscribers = subs
}()
return stream, nil
} }

@ -1,6 +1,8 @@
package dashboard package dashboard
import ( import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -133,9 +135,10 @@ func (b *DashboardsAPIBuilder) GetAPIGroupInfo(
access: b.access, access: b.access,
tableConverter: store.TableConvertor, tableConverter: store.TableConvertor,
} }
fmt.Printf("%v\n", legacyStore)
storage := map[string]rest.Storage{} storage := map[string]rest.Storage{}
storage[resourceInfo.StoragePath()] = legacyStore //storage[resourceInfo.StoragePath()] = legacyStore
storage[resourceInfo.StoragePath("dto")] = &DTOConnector{ storage[resourceInfo.StoragePath("dto")] = &DTOConnector{
builder: b, builder: b,
} }
@ -149,7 +152,11 @@ func (b *DashboardsAPIBuilder) GetAPIGroupInfo(
if err := store.CompleteWithOptions(options); err != nil { if err := store.CompleteWithOptions(options); err != nil {
return nil, err return nil, err
} }
storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(grafanarest.Mode1, legacyStore, store, reg) storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(
grafanarest.Mode1,
store, //legacyStore,
store,
reg)
} }
// Summary // Summary

Loading…
Cancel
Save