mirror of https://github.com/grafana/grafana
Provisioning: Merge watch support into live (#102618)
parent
77c5e0eeb2
commit
1a00801e6a
@ -0,0 +1,214 @@ |
||||
package features |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strings" |
||||
"sync" |
||||
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/client-go/dynamic" |
||||
|
||||
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1" |
||||
|
||||
"github.com/grafana/authlib/types" |
||||
"github.com/grafana/grafana-app-sdk/logging" |
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter" |
||||
"github.com/grafana/grafana-plugin-sdk-go/live" |
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
||||
"github.com/grafana/grafana/pkg/services/apiserver" |
||||
"github.com/grafana/grafana/pkg/services/live/model" |
||||
) |
||||
|
||||
// WatchRunner will start a watch task and broadcast results
|
||||
type WatchRunner struct { |
||||
publisher model.ChannelPublisher |
||||
configProvider apiserver.RestConfigProvider |
||||
|
||||
watchingMu sync.Mutex |
||||
watching map[string]*watcher |
||||
} |
||||
|
||||
func NewWatchRunner(publisher model.ChannelPublisher, configProvider apiserver.RestConfigProvider) *WatchRunner { |
||||
return &WatchRunner{ |
||||
publisher: publisher, |
||||
configProvider: configProvider, |
||||
watching: make(map[string]*watcher), |
||||
} |
||||
} |
||||
|
||||
func (b *WatchRunner) GetHandlerForPath(_ string) (model.ChannelHandler, error) { |
||||
return b, nil // all dashboards share the same handler
|
||||
} |
||||
|
||||
// Valid paths look like: {version}/{resource}[={name}]/{user.uid}
|
||||
// * v0alpha1/dashboards/u12345
|
||||
// * v0alpha1/dashboards=ABCD/u12345
|
||||
func (b *WatchRunner) OnSubscribe(ctx context.Context, u identity.Requester, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) { |
||||
// To make sure we do not share resources across users, in clude the UID in the path
|
||||
userID := u.GetIdentifier() |
||||
if userID == "" { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, fmt.Errorf("missing user identity") |
||||
} |
||||
if !strings.HasSuffix(e.Path, userID) { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, fmt.Errorf("path must end with user uid (%s)", userID) |
||||
} |
||||
|
||||
// While testing with provisioning repositories, we will limit this to admin only
|
||||
if !u.HasRole(identity.RoleAdmin) { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, fmt.Errorf("only admin users for now") |
||||
} |
||||
|
||||
b.watchingMu.Lock() |
||||
defer b.watchingMu.Unlock() |
||||
|
||||
current, ok := b.watching[e.Channel] |
||||
if ok && !current.done { |
||||
return model.SubscribeReply{ |
||||
JoinLeave: false, |
||||
Presence: false, |
||||
Recover: false, |
||||
}, backend.SubscribeStreamStatusOK, nil |
||||
} |
||||
|
||||
// Try to start a watcher for this request
|
||||
gvr, name, err := parseWatchRequest(e.Channel, userID) |
||||
if err != nil { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, err |
||||
} |
||||
|
||||
// Test this with only provisiong support -- then we can evaluate a broader rollout
|
||||
if gvr.Group != provisioning.GROUP { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, |
||||
fmt.Errorf("watching provisioned resources is OK allowed (for now)") |
||||
} |
||||
|
||||
requester := types.WithAuthInfo(context.Background(), u) |
||||
cfg, err := b.configProvider.GetRestConfig(requester) |
||||
if err != nil { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, err |
||||
} |
||||
uclient, err := dynamic.NewForConfig(cfg) |
||||
if err != nil { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, err |
||||
} |
||||
client := uclient.Resource(gvr).Namespace(u.GetNamespace()) |
||||
|
||||
opts := v1.ListOptions{} |
||||
if len(name) > 1 { |
||||
opts.FieldSelector = "metadata.name=" + name |
||||
} |
||||
watch, err := client.Watch(requester, opts) |
||||
if err != nil { |
||||
return model.SubscribeReply{}, backend.SubscribeStreamStatusNotFound, err |
||||
} |
||||
|
||||
current = &watcher{ |
||||
orgId: u.GetOrgID(), |
||||
channel: e.Channel, |
||||
publisher: b.publisher, |
||||
watch: watch, |
||||
} |
||||
|
||||
b.watching[e.Channel] = current |
||||
go current.run(ctx) |
||||
|
||||
return model.SubscribeReply{ |
||||
JoinLeave: false, // need unsubscribe envents
|
||||
Presence: false, |
||||
Recover: false, |
||||
}, backend.SubscribeStreamStatusOK, nil |
||||
} |
||||
|
||||
func parseWatchRequest(channel string, user string) (gvr schema.GroupVersionResource, name string, err error) { |
||||
addr, err := live.ParseChannel(channel) |
||||
if err != nil { |
||||
return gvr, "", err |
||||
} |
||||
|
||||
parts := strings.Split(addr.Path, "/") |
||||
if len(parts) != 3 { |
||||
return gvr, "", fmt.Errorf("expecting path: {version}/{resource}={name}/{user}") |
||||
} |
||||
if parts[2] != user { |
||||
return gvr, "", fmt.Errorf("expecting user suffix: %s", user) |
||||
} |
||||
|
||||
resource := strings.Split(parts[1], "=") |
||||
gvr = schema.GroupVersionResource{ |
||||
Group: addr.Namespace, |
||||
Version: parts[0], |
||||
Resource: resource[0], |
||||
} |
||||
if len(resource) > 1 { |
||||
name = resource[1] |
||||
} |
||||
return gvr, name, nil |
||||
} |
||||
|
||||
// OnPublish is called when a client wants to broadcast on the websocket
|
||||
func (b *WatchRunner) OnPublish(_ context.Context, u identity.Requester, e model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) { |
||||
return model.PublishReply{}, backend.PublishStreamStatusNotFound, fmt.Errorf("watch does not support publish") |
||||
} |
||||
|
||||
type watcher struct { |
||||
orgId int64 |
||||
channel string |
||||
publisher model.ChannelPublisher |
||||
done bool |
||||
watch watch.Interface |
||||
} |
||||
|
||||
func (b *watcher) run(ctx context.Context) { |
||||
logger := logging.FromContext(ctx).With("channel", b.channel) |
||||
|
||||
ch := b.watch.ResultChan() |
||||
for { |
||||
select { |
||||
// This is sent when there are no longer any subscriptions
|
||||
case <-ctx.Done(): |
||||
logger.Info("context done", "channel", b.channel) |
||||
b.watch.Stop() |
||||
b.done = true |
||||
return |
||||
|
||||
// Each watch event
|
||||
case event, ok := <-ch: |
||||
if !ok { |
||||
logger.Info("watch stream broken", "channel", b.channel) |
||||
b.watch.Stop() |
||||
b.done = true // will force reconnect from the frontend
|
||||
return |
||||
} |
||||
|
||||
cfg := jsoniter.ConfigCompatibleWithStandardLibrary |
||||
stream := cfg.BorrowStream(nil) |
||||
defer cfg.ReturnStream(stream) |
||||
|
||||
// regular json.Marshal() uses upper case
|
||||
stream.WriteObjectStart() |
||||
stream.WriteObjectField("type") |
||||
stream.WriteString(string(event.Type)) |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("object") |
||||
stream.WriteVal(event.Object) |
||||
stream.WriteObjectEnd() |
||||
|
||||
buf := stream.Buffer() |
||||
data := make([]byte, len(buf)) |
||||
copy(data, buf) |
||||
|
||||
err := b.publisher(b.orgId, b.channel, data) |
||||
if err != nil { |
||||
logger.Error("publish error", "channel", b.channel, "err", err) |
||||
b.watch.Stop() |
||||
b.done = true // will force reconnect from the frontend
|
||||
continue |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,12 @@ |
||||
import { discoveryResources } from './discovery'; |
||||
|
||||
const discoverySnapshot = require('./snapshots/discovery-snapshot.json'); |
||||
|
||||
describe('simple typescript tests', () => { |
||||
it('simple', async () => { |
||||
const watchable = discoveryResources(discoverySnapshot) |
||||
.filter((v) => v.verbs.includes('watch')) |
||||
.map((v) => v.resource); |
||||
expect(watchable).toEqual(['user-storage', 'dashboards', 'dashboards', 'dashboards']); |
||||
}); |
||||
}); |
||||
@ -0,0 +1,84 @@ |
||||
import { lastValueFrom, map } from 'rxjs'; |
||||
|
||||
import { FetchResponse, getBackendSrv } from '@grafana/runtime'; |
||||
|
||||
import { GroupVersionKind, ListMeta } from './types'; |
||||
|
||||
export type GroupDiscoveryResource = { |
||||
resource: string; |
||||
responseKind: GroupVersionKind; |
||||
scope: 'Namespaced' | 'Cluster'; |
||||
singularResource: string; |
||||
verbs: string[]; |
||||
subresources?: GroupDiscoverySubresource[]; |
||||
}; |
||||
|
||||
export type GroupDiscoverySubresource = { |
||||
subresource: string; |
||||
responseKind: GroupVersionKind; |
||||
verbs: string[]; |
||||
}; |
||||
|
||||
export type GroupDiscoveryVersion = { |
||||
version: string; |
||||
freshness: 'Current' | string; |
||||
resources: GroupDiscoveryResource[]; |
||||
}; |
||||
|
||||
export type GroupDiscoveryItem = { |
||||
metadata: { |
||||
name: string; |
||||
}; |
||||
versions: GroupDiscoveryVersion[]; |
||||
}; |
||||
|
||||
export type APIGroupDiscoveryList = { |
||||
metadata: ListMeta; |
||||
items: GroupDiscoveryItem[]; |
||||
}; |
||||
|
||||
export async function getAPIGroupDiscoveryList(): Promise<APIGroupDiscoveryList> { |
||||
return await lastValueFrom( |
||||
getBackendSrv() |
||||
.fetch<APIGroupDiscoveryList>({ |
||||
method: 'GET', |
||||
url: '/apis', |
||||
headers: { |
||||
Accept: |
||||
'application/json;g=apidiscovery.k8s.io;v=v2;as=APIGroupDiscoveryList,application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList,application/json', |
||||
}, |
||||
}) |
||||
.pipe( |
||||
map((response: FetchResponse<APIGroupDiscoveryList>) => { |
||||
// Fill in the group+version before returning
|
||||
for (let api of response.data.items) { |
||||
for (let version of api.versions) { |
||||
for (let resource of version.resources) { |
||||
resource.responseKind.group = api.metadata.name; |
||||
resource.responseKind.version = version.version; |
||||
if (resource.subresources) { |
||||
for (let sub of resource.subresources) { |
||||
sub.responseKind.group = api.metadata.name; |
||||
sub.responseKind.version = version.version; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
return response.data; |
||||
}) |
||||
) |
||||
); |
||||
} |
||||
|
||||
export function discoveryResources(apis: APIGroupDiscoveryList): GroupDiscoveryResource[] { |
||||
const resources: GroupDiscoveryResource[] = []; |
||||
for (let api of apis.items) { |
||||
for (let version of api.versions) { |
||||
for (let resource of version.resources) { |
||||
resources.push(resource); |
||||
} |
||||
} |
||||
} |
||||
return resources; |
||||
} |
||||
@ -0,0 +1,295 @@ |
||||
{ |
||||
"kind": "APIGroupDiscoveryList", |
||||
"apiVersion": "apidiscovery.k8s.io/v2", |
||||
"metadata": {}, |
||||
"items": [ |
||||
{ |
||||
"metadata": { "name": "userstorage.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "user-storage", |
||||
"responseKind": { "group": "", "version": "", "kind": "UserStorage" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "user-storage", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "notifications.alerting.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "receivers", |
||||
"responseKind": { "group": "", "version": "", "kind": "Receiver" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "receiver", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"] |
||||
}, |
||||
{ |
||||
"resource": "routingtrees", |
||||
"responseKind": { "group": "", "version": "", "kind": "RoutingTree" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "routingtree", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"] |
||||
}, |
||||
{ |
||||
"resource": "templategroups", |
||||
"responseKind": { "group": "", "version": "", "kind": "TemplateGroup" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "templategroup", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"] |
||||
}, |
||||
{ |
||||
"resource": "timeintervals", |
||||
"responseKind": { "group": "", "version": "", "kind": "TimeInterval" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "timeinterval", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "iam.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "serviceaccounts", |
||||
"responseKind": { "group": "", "version": "", "kind": "ServiceAccount" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "serviceaccount", |
||||
"verbs": ["get", "list"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "tokens", |
||||
"responseKind": { "group": "", "version": "", "kind": "UserTeamList" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"resource": "ssosettings", |
||||
"responseKind": { "group": "", "version": "", "kind": "SSOSetting" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "ssosetting", |
||||
"verbs": ["delete", "get", "list", "patch", "update"] |
||||
}, |
||||
{ |
||||
"resource": "teambindings", |
||||
"responseKind": { "group": "", "version": "", "kind": "TeamBinding" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "teambinding", |
||||
"verbs": ["get", "list"] |
||||
}, |
||||
{ |
||||
"resource": "teams", |
||||
"responseKind": { "group": "", "version": "", "kind": "Team" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "team", |
||||
"verbs": ["get", "list"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "members", |
||||
"responseKind": { "group": "", "version": "", "kind": "TeamMemberList" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"resource": "users", |
||||
"responseKind": { "group": "", "version": "", "kind": "User" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "user", |
||||
"verbs": ["get", "list"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "teams", |
||||
"responseKind": { "group": "", "version": "", "kind": "UserTeamList" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "folder.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "folders", |
||||
"responseKind": { "group": "", "version": "", "kind": "Folder" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "folder", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "access", |
||||
"responseKind": { "group": "", "version": "", "kind": "FolderAccessInfo" }, |
||||
"verbs": ["get"] |
||||
}, |
||||
{ |
||||
"subresource": "counts", |
||||
"responseKind": { "group": "", "version": "", "kind": "DescendantCounts" }, |
||||
"verbs": ["get"] |
||||
}, |
||||
{ |
||||
"subresource": "parents", |
||||
"responseKind": { "group": "", "version": "", "kind": "FolderInfoList" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "featuretoggle.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "features", |
||||
"responseKind": { "group": "", "version": "", "kind": "Feature" }, |
||||
"scope": "Cluster", |
||||
"singularResource": "feature", |
||||
"verbs": ["get", "list"] |
||||
}, |
||||
{ |
||||
"resource": "featuretoggles", |
||||
"responseKind": { "group": "", "version": "", "kind": "FeatureToggles" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "featuretoggle", |
||||
"verbs": ["get", "list"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "dashboard.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "dashboards", |
||||
"responseKind": { "group": "", "version": "", "kind": "Dashboard" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "dashboard", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "dto", |
||||
"responseKind": { "group": "", "version": "", "kind": "DashboardWithAccessInfo" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"resource": "librarypanels", |
||||
"responseKind": { "group": "", "version": "", "kind": "LibraryPanel" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "librarypanel", |
||||
"verbs": ["get", "list"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
}, |
||||
{ |
||||
"version": "v1alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "dashboards", |
||||
"responseKind": { "group": "", "version": "", "kind": "Dashboard" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "dashboard", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "dto", |
||||
"responseKind": { "group": "", "version": "", "kind": "DashboardWithAccessInfo" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"resource": "librarypanels", |
||||
"responseKind": { "group": "", "version": "", "kind": "LibraryPanel" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "librarypanel", |
||||
"verbs": ["get", "list"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
}, |
||||
{ |
||||
"version": "v2alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "dashboards", |
||||
"responseKind": { "group": "", "version": "", "kind": "Dashboard" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "dashboard", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"], |
||||
"subresources": [ |
||||
{ |
||||
"subresource": "dto", |
||||
"responseKind": { "group": "", "version": "", "kind": "DashboardWithAccessInfo" }, |
||||
"verbs": ["get"] |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"resource": "librarypanels", |
||||
"responseKind": { "group": "", "version": "", "kind": "LibraryPanel" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "librarypanel", |
||||
"verbs": ["get", "list"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
}, |
||||
{ |
||||
"metadata": { "name": "playlist.grafana.app", "creationTimestamp": null }, |
||||
"versions": [ |
||||
{ |
||||
"version": "v0alpha1", |
||||
"resources": [ |
||||
{ |
||||
"resource": "playlists", |
||||
"responseKind": { "group": "", "version": "", "kind": "Playlist" }, |
||||
"scope": "Namespaced", |
||||
"singularResource": "playlist", |
||||
"verbs": ["create", "delete", "deletecollection", "get", "list", "patch", "update"] |
||||
} |
||||
], |
||||
"freshness": "Current" |
||||
} |
||||
] |
||||
} |
||||
] |
||||
} |
||||
Loading…
Reference in new issue