K8s: Expose testdata connection as an api-server (dev mode only) (#79726)

pull/79615/head^2
Ryan McKinley 1 year ago committed by GitHub
parent 5589c0a8f2
commit 53411eeaa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      hack/update-codegen.sh
  2. 5
      pkg/apis/datasource/v0alpha1/doc.go
  3. 57
      pkg/apis/datasource/v0alpha1/types.go
  4. 100
      pkg/apis/datasource/v0alpha1/unstructured.go
  5. 100
      pkg/apis/datasource/v0alpha1/zz_generated.deepcopy.go
  6. 19
      pkg/apis/datasource/v0alpha1/zz_generated.defaults.go
  7. 2683
      pkg/apis/datasource/v0alpha1/zz_generated.openapi.go
  8. 2
      pkg/apis/example/v0alpha1/doc.go
  9. 24
      pkg/apis/types.go
  10. 2
      pkg/registry/apis/apis.go
  11. 8
      pkg/registry/apis/datasource/README.md
  12. 82
      pkg/registry/apis/datasource/authorizer.go
  13. 95
      pkg/registry/apis/datasource/connections.go
  14. 104
      pkg/registry/apis/datasource/query.go
  15. 33
      pkg/registry/apis/datasource/query_test.go
  16. 262
      pkg/registry/apis/datasource/register.go
  17. 71
      pkg/registry/apis/datasource/sub_health.go
  18. 51
      pkg/registry/apis/datasource/sub_proxy.go
  19. 103
      pkg/registry/apis/datasource/sub_query.go
  20. 79
      pkg/registry/apis/datasource/sub_resource.go
  21. 25
      pkg/registry/apis/datasource/utils.go
  22. 32
      pkg/registry/apis/datasource/utils_test.go
  23. 3
      pkg/registry/apis/wireset.go
  24. 145
      pkg/tests/apis/datasource/testdata_test.go

@ -30,7 +30,7 @@ CLIENTSET_PKG_NAME=clientset \
"${CODEGEN_PKG}/generate-groups.sh" "all" \ "${CODEGEN_PKG}/generate-groups.sh" "all" \
github.com/grafana/grafana/pkg/generated \ github.com/grafana/grafana/pkg/generated \
github.com/grafana/grafana/pkg/apis \ github.com/grafana/grafana/pkg/apis \
"folders:v0alpha1" \ "datasource:v0alpha1" \
--output-base "${OUTDIR}" \ --output-base "${OUTDIR}" \
--go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt" --go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt"
@ -40,6 +40,6 @@ CLIENTSET_PKG_NAME=clientset \
github.com/grafana/grafana/pkg/generated \ github.com/grafana/grafana/pkg/generated \
github.com/grafana/grafana/pkg/apis \ github.com/grafana/grafana/pkg/apis \
github.com/grafana/grafana/pkg/apis \ github.com/grafana/grafana/pkg/apis \
"folders:v0alpha1" \ "datasource:v0alpha1" \
--output-base "${OUTDIR}" \ --output-base "${OUTDIR}" \
--go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt" --go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt"

@ -0,0 +1,5 @@
// +k8s:deepcopy-gen=package
// +k8s:openapi-gen=true
// +groupName=datasources.grafana.com
package v0alpha1

@ -0,0 +1,57 @@
package v0alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"github.com/grafana/grafana/pkg/apis"
)
const (
GROUP = "*.datasource.grafana.app"
VERSION = "v0alpha1"
)
var GenericConnectionResourceInfo = apis.NewResourceInfo(GROUP, VERSION,
"connections", "connection", "DataSourceConnection",
func() runtime.Object { return &DataSourceConnection{} },
func() runtime.Object { return &DataSourceConnectionList{} },
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type DataSourceConnection struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
// The display name
Title string `json:"title"`
// Optional description for the data source (does not exist yet)
Description string `json:"description,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type DataSourceConnectionList struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ListMeta `json:"metadata,omitempty"`
Items []DataSourceConnection `json:"items,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type HealthCheckResult struct {
metav1.TypeMeta `json:",inline"`
// The string description
Status string `json:"status,omitempty"`
// Explicit status code
Code int `json:"code,omitempty"`
// Optional description for the data source
Message string `json:"message,omitempty"`
// Spec depends on the the plugin
Details *Unstructured `json:"details,omitempty"`
}

@ -0,0 +1,100 @@
package v0alpha1
import (
"encoding/json"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
runtime "k8s.io/apimachinery/pkg/runtime"
)
// Unstructured allows objects that do not have Golang structs registered to be manipulated
// generically.
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
func (u *Unstructured) UnstructuredContent() map[string]interface{} {
if u.Object == nil {
return make(map[string]interface{})
}
return u.Object
}
func (u *Unstructured) SetUnstructuredContent(content map[string]interface{}) {
u.Object = content
}
// MarshalJSON ensures that the unstructured object produces proper
// JSON when passed to Go's standard JSON library.
func (u *Unstructured) MarshalJSON() ([]byte, error) {
return json.Marshal(u.Object)
}
// UnmarshalJSON ensures that the unstructured object properly decodes
// JSON when passed to Go's standard JSON library.
func (u *Unstructured) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &u.Object)
}
func (u *Unstructured) DeepCopy() *Unstructured {
if u == nil {
return nil
}
out := new(Unstructured)
*out = *u
out.Object = runtime.DeepCopyJSON(u.Object)
return out
}
func (u *Unstructured) DeepCopyInto(out *Unstructured) {
clone := u.DeepCopy()
*out = *clone
}
func (u *Unstructured) Set(field string, value interface{}) {
if u.Object == nil {
u.Object = make(map[string]interface{})
}
_ = unstructured.SetNestedField(u.Object, value, field)
}
func (u *Unstructured) Remove(fields ...string) {
if u.Object == nil {
u.Object = make(map[string]interface{})
}
unstructured.RemoveNestedField(u.Object, fields...)
}
func (u *Unstructured) SetNestedField(value interface{}, fields ...string) {
if u.Object == nil {
u.Object = make(map[string]interface{})
}
_ = unstructured.SetNestedField(u.Object, value, fields...)
}
func (u *Unstructured) GetNestedString(fields ...string) string {
val, found, err := unstructured.NestedString(u.Object, fields...)
if !found || err != nil {
return ""
}
return val
}
func (u *Unstructured) GetNestedStringSlice(fields ...string) []string {
val, found, err := unstructured.NestedStringSlice(u.Object, fields...)
if !found || err != nil {
return nil
}
return val
}
func (u *Unstructured) GetNestedInt64(fields ...string) int64 {
val, found, err := unstructured.NestedInt64(u.Object, fields...)
if !found || err != nil {
return 0
}
return val
}

@ -0,0 +1,100 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
// SPDX-License-Identifier: AGPL-3.0-only
// Code generated by deepcopy-gen. DO NOT EDIT.
package v0alpha1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataSourceConnection) DeepCopyInto(out *DataSourceConnection) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataSourceConnection.
func (in *DataSourceConnection) DeepCopy() *DataSourceConnection {
if in == nil {
return nil
}
out := new(DataSourceConnection)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *DataSourceConnection) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataSourceConnectionList) DeepCopyInto(out *DataSourceConnectionList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]DataSourceConnection, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataSourceConnectionList.
func (in *DataSourceConnectionList) DeepCopy() *DataSourceConnectionList {
if in == nil {
return nil
}
out := new(DataSourceConnectionList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *DataSourceConnectionList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HealthCheckResult) DeepCopyInto(out *HealthCheckResult) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Details != nil {
in, out := &in.Details, &out.Details
*out = (*in).DeepCopy()
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HealthCheckResult.
func (in *HealthCheckResult) DeepCopy() *HealthCheckResult {
if in == nil {
return nil
}
out := new(HealthCheckResult)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *HealthCheckResult) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

@ -0,0 +1,19 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
// SPDX-License-Identifier: AGPL-3.0-only
// Code generated by defaulter-gen. DO NOT EDIT.
package v0alpha1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// RegisterDefaults adds defaulters functions to the given scheme.
// Public to allow building arbitrary schemes.
// All generated defaulters are covering - they call all nested defaulters.
func RegisterDefaults(scheme *runtime.Scheme) error {
return nil
}

File diff suppressed because it is too large Load Diff

@ -1,6 +1,6 @@
// +k8s:deepcopy-gen=package // +k8s:deepcopy-gen=package
// +k8s:openapi-gen=true // +k8s:openapi-gen=true
// +groupName=example.grafana.com // +groupName=example.grafana.app
// The testing api is a dependency free service that we can use to experiment with // The testing api is a dependency free service that we can use to experiment with
// api aggregation across multiple deployment models. Specifically: // api aggregation across multiple deployment models. Specifically:

@ -13,6 +13,7 @@ type ResourceInfo struct {
version string version string
resourceName string resourceName string
singularName string singularName string
shortName string
kind string kind string
newObj func() runtime.Object newObj func() runtime.Object
newList func() runtime.Object newList func() runtime.Object
@ -20,13 +21,34 @@ type ResourceInfo struct {
func NewResourceInfo(group, version, resourceName, singularName, kind string, func NewResourceInfo(group, version, resourceName, singularName, kind string,
newObj func() runtime.Object, newList func() runtime.Object) ResourceInfo { newObj func() runtime.Object, newList func() runtime.Object) ResourceInfo {
return ResourceInfo{group, version, resourceName, singularName, kind, newObj, newList} shortName := "" // an optional alias helpful in kubectl eg ("sa" for serviceaccounts)
return ResourceInfo{group, version, resourceName, singularName, shortName, kind, newObj, newList}
}
func (info *ResourceInfo) WithGroupAndShortName(group string, shortName string) ResourceInfo {
return ResourceInfo{
group: group,
version: info.version,
resourceName: info.resourceName,
singularName: info.singularName,
kind: info.kind,
shortName: shortName,
newObj: info.newObj,
newList: info.newList,
}
} }
func (info *ResourceInfo) GetSingularName() string { func (info *ResourceInfo) GetSingularName() string {
return info.singularName return info.singularName
} }
func (info *ResourceInfo) GetShortNames() []string {
if info.shortName == "" {
return []string{}
}
return []string{info.shortName}
}
// TypeMeta returns k8s type // TypeMeta returns k8s type
func (info *ResourceInfo) TypeMeta() metav1.TypeMeta { func (info *ResourceInfo) TypeMeta() metav1.TypeMeta {
return metav1.TypeMeta{ return metav1.TypeMeta{

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
"github.com/grafana/grafana/pkg/registry/apis/example" "github.com/grafana/grafana/pkg/registry/apis/example"
"github.com/grafana/grafana/pkg/registry/apis/folders" "github.com/grafana/grafana/pkg/registry/apis/folders"
"github.com/grafana/grafana/pkg/registry/apis/playlist" "github.com/grafana/grafana/pkg/registry/apis/playlist"
@ -20,6 +21,7 @@ type Service struct{}
func ProvideRegistryServiceSink( func ProvideRegistryServiceSink(
_ *playlist.PlaylistAPIBuilder, _ *playlist.PlaylistAPIBuilder,
_ *example.TestingAPIBuilder, _ *example.TestingAPIBuilder,
_ *datasource.DataSourceAPIBuilder,
_ *folders.FolderAPIBuilder, _ *folders.FolderAPIBuilder,
) *Service { ) *Service {
return &Service{} return &Service{}

@ -0,0 +1,8 @@
Experimental!
This is exploring how to expose any datasource as a k8s aggregated API server.
Unlike the other services, this will register other plugins as:
> {plugin}.datasource.grafana.app

@ -0,0 +1,82 @@
package datasource
import (
"context"
"fmt"
"k8s.io/apiserver/pkg/authorization/authorizer"
"github.com/grafana/grafana/pkg/infra/appcontext"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/datasources"
)
func (b *DataSourceAPIBuilder) GetAuthorizer() authorizer.Authorizer {
return authorizer.AuthorizerFunc(
func(ctx context.Context, attr authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) {
if !attr.IsResourceRequest() {
return authorizer.DecisionNoOpinion, "", nil
}
user, err := appcontext.User(ctx)
if err != nil {
return authorizer.DecisionDeny, "valid user is required", err
}
uidScope := datasources.ScopeProvider.GetResourceScopeUID(attr.GetName())
// Must have query access to see a connection
if attr.GetResource() == b.connectionResourceInfo.GroupResource().Resource {
scopes := []string{}
if attr.GetName() != "" {
scopes = []string{uidScope}
}
ok, err := b.accessControl.Evaluate(ctx, user, ac.EvalPermission(datasources.ActionQuery, scopes...))
if !ok || err != nil {
return authorizer.DecisionDeny, "unable to query", err
}
if attr.GetSubresource() == "proxy" {
return authorizer.DecisionDeny, "TODO: map the plugin settings to access rules", err
}
return authorizer.DecisionAllow, "", nil
}
// Must have query access to see a connection
action := "" // invalid
switch attr.GetVerb() {
case "list":
ok, err := b.accessControl.Evaluate(ctx, user,
ac.EvalPermission(datasources.ActionRead)) // Can see any datasource values
if !ok || err != nil {
return authorizer.DecisionDeny, "unable to read", err
}
return authorizer.DecisionAllow, "", nil
case "get":
action = datasources.ActionRead
case "create":
action = datasources.ActionWrite
case "post":
fallthrough
case "update":
fallthrough
case "patch":
fallthrough
case "put":
action = datasources.ActionWrite
case "delete":
action = datasources.ActionDelete
default:
//b.log.Info("unknown verb", "verb", attr.GetVerb())
return authorizer.DecisionDeny, "unsupported verb", nil // Unknown verb
}
ok, err := b.accessControl.Evaluate(ctx, user,
ac.EvalPermission(action, uidScope))
if !ok || err != nil {
return authorizer.DecisionDeny, fmt.Sprintf("unable to %s", action), nil
}
return authorizer.DecisionAllow, "", nil
})
}

@ -0,0 +1,95 @@
package datasource
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/apis"
"github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
"github.com/grafana/grafana/pkg/kinds"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/grafana-apiserver/utils"
)
var (
_ rest.Scoper = (*connectionAccess)(nil)
_ rest.SingularNameProvider = (*connectionAccess)(nil)
_ rest.Getter = (*connectionAccess)(nil)
_ rest.Lister = (*connectionAccess)(nil)
_ rest.Storage = (*connectionAccess)(nil)
)
type connectionAccess struct {
resourceInfo apis.ResourceInfo
tableConverter rest.TableConvertor
builder *DataSourceAPIBuilder
}
func (s *connectionAccess) New() runtime.Object {
return s.resourceInfo.NewFunc()
}
func (s *connectionAccess) Destroy() {}
func (s *connectionAccess) NamespaceScoped() bool {
return true
}
func (s *connectionAccess) GetSingularName() string {
return s.resourceInfo.GetSingularName()
}
func (s *connectionAccess) ShortNames() []string {
return s.resourceInfo.GetShortNames()
}
func (s *connectionAccess) NewList() runtime.Object {
return s.resourceInfo.NewListFunc()
}
func (s *connectionAccess) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return s.tableConverter.ConvertToTable(ctx, object, tableOptions)
}
func (s *connectionAccess) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
ds, err := s.builder.getDataSource(ctx, name)
if err != nil {
return nil, err
}
return s.asConnection(ds), nil
}
func (s *connectionAccess) List(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
result := &v0alpha1.DataSourceConnectionList{
Items: []v0alpha1.DataSourceConnection{},
}
vals, err := s.builder.getDataSources(ctx)
if err == nil {
for _, ds := range vals {
result.Items = append(result.Items, *s.asConnection(ds))
}
}
return result, err
}
func (s *connectionAccess) asConnection(ds *datasources.DataSource) *v0alpha1.DataSourceConnection {
v := &v0alpha1.DataSourceConnection{
TypeMeta: s.resourceInfo.TypeMeta(),
ObjectMeta: metav1.ObjectMeta{
Name: ds.UID,
Namespace: s.builder.namespacer(ds.OrgID),
CreationTimestamp: metav1.NewTime(ds.Created),
ResourceVersion: fmt.Sprintf("%d", ds.Updated.UnixMilli()),
},
Title: ds.Name,
}
v.UID = utils.CalculateClusterWideUID(v) // indicates if the value changed on the server
meta := kinds.MetaAccessor(v)
meta.SetUpdatedTimestamp(&ds.Updated)
return v
}

@ -0,0 +1,104 @@
package datasource
import (
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb/legacydata"
)
// Copied from: https://github.com/grafana/grafana/blob/main/pkg/api/dtos/models.go#L62
type rawMetricRequest struct {
// From Start time in epoch timestamps in milliseconds or relative using Grafana time units.
// required: true
// example: now-1h
From string `json:"from"`
// To End time in epoch timestamps in milliseconds or relative using Grafana time units.
// required: true
// example: now
To string `json:"to"`
// queries.refId – Specifies an identifier of the query. Is optional and default to “A”.
// queries.datasourceId – Specifies the data source to be queried. Each query in the request must have an unique datasourceId.
// queries.maxDataPoints - Species maximum amount of data points that dashboard panel can render. Is optional and default to 100.
// queries.intervalMs - Specifies the time interval in milliseconds of time series. Is optional and defaults to 1000.
// required: true
// example: [ { "refId": "A", "intervalMs": 86400000, "maxDataPoints": 1092, "datasource":{ "uid":"PD8C576611E62080A" }, "rawSql": "SELECT 1 as valueOne, 2 as valueTwo", "format": "table" } ]
Queries []rawDataQuery `json:"queries"`
// required: false
Debug bool `json:"debug"`
}
type rawDataQuery = map[string]interface{}
func readQueries(in []byte) ([]backend.DataQuery, error) {
reqDTO := &rawMetricRequest{}
err := json.Unmarshal(in, &reqDTO)
if err != nil {
return nil, err
}
if len(reqDTO.Queries) == 0 {
return nil, fmt.Errorf("expected queries")
}
tr := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To)
backendTr := backend.TimeRange{
From: tr.MustGetFrom(),
To: tr.MustGetTo(),
}
queries := make([]backend.DataQuery, 0)
for _, query := range reqDTO.Queries {
dataQuery := backend.DataQuery{
TimeRange: backendTr,
}
v, ok := query["refId"]
if ok {
dataQuery.RefID, ok = v.(string)
if !ok {
return nil, fmt.Errorf("expeted string refId")
}
}
v, ok = query["queryType"]
if ok {
dataQuery.QueryType, ok = v.(string)
if !ok {
return nil, fmt.Errorf("expeted string queryType")
}
}
v, ok = query["maxDataPoints"]
if ok {
vInt, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("expected float64 maxDataPoints")
}
dataQuery.MaxDataPoints = int64(vInt)
}
v, ok = query["intervalMs"]
if ok {
vInt, ok := v.(float64)
if !ok {
return nil, fmt.Errorf("expected float64 intervalMs")
}
dataQuery.Interval = time.Duration(vInt)
}
dataQuery.JSON, err = json.Marshal(query)
if err != nil {
return nil, err
}
queries = append(queries, dataQuery)
}
return queries, nil
}

@ -0,0 +1,33 @@
package datasource
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestParseQueriesIntoQueryDataRequest(t *testing.T) {
request := []byte(`{
"queries": [
{
"refId": "A",
"datasource": {
"type": "grafana-googlesheets-datasource",
"uid": "b1808c48-9fc9-4045-82d7-081781f8a553"
},
"cacheDurationSeconds": 300,
"spreadsheet": "spreadsheetID",
"range": "",
"datasourceId": 4,
"intervalMs": 30000,
"maxDataPoints": 794
}
],
"from": "1692624667389",
"to": "1692646267389"
}`)
parsedDataQuery, err := readQueries(request)
require.NoError(t, err)
require.Equal(t, len(parsedDataQuery), 1)
}

@ -0,0 +1,262 @@
package datasource
import (
"context"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
common "k8s.io/kube-openapi/pkg/common"
"k8s.io/utils/strings/slices"
"github.com/grafana/grafana/pkg/apis"
"github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
grafanaapiserver "github.com/grafana/grafana/pkg/services/grafana-apiserver"
"github.com/grafana/grafana/pkg/services/grafana-apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/grafana-apiserver/utils"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/setting"
)
var _ grafanaapiserver.APIGroupBuilder = (*DataSourceAPIBuilder)(nil)
// This is used just so wire has something unique to return
type DataSourceAPIBuilder struct {
connectionResourceInfo apis.ResourceInfo
plugin pluginstore.Plugin
client plugins.Client
dsService datasources.DataSourceService
dsCache datasources.CacheService
accessControl accesscontrol.AccessControl
namespacer request.NamespaceMapper
}
func RegisterAPIService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
apiregistration grafanaapiserver.APIRegistrar,
pluginClient plugins.Client,
pluginStore pluginstore.Store,
dsService datasources.DataSourceService,
dsCache datasources.CacheService,
accessControl accesscontrol.AccessControl,
) (*DataSourceAPIBuilder, error) {
// This requires devmode!
if !features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) {
return nil, nil // skip registration unless opting into experimental apis
}
var err error
var builder *DataSourceAPIBuilder
all := pluginStore.Plugins(context.Background(), plugins.TypeDataSource)
ids := []string{
"grafana-testdata-datasource",
}
namespacer := request.GetNamespaceMapper(cfg)
for _, ds := range all {
if !slices.Contains(ids, ds.ID) {
continue // skip this one
}
builder, err = NewDataSourceAPIBuilder(ds, pluginClient, dsService, dsCache, accessControl, namespacer)
if err != nil {
return nil, err
}
apiregistration.RegisterAPI(builder)
}
return builder, nil // only used for wire
}
func NewDataSourceAPIBuilder(
plugin pluginstore.Plugin,
client plugins.Client,
dsService datasources.DataSourceService,
dsCache datasources.CacheService,
accessControl accesscontrol.AccessControl,
namespacer request.NamespaceMapper) (*DataSourceAPIBuilder, error) {
group, err := getDatasourceGroupNameFromPluginID(plugin.ID)
if err != nil {
return nil, err
}
return &DataSourceAPIBuilder{
connectionResourceInfo: v0alpha1.GenericConnectionResourceInfo.WithGroupAndShortName(group, plugin.ID+"-connection"),
plugin: plugin,
client: client,
dsService: dsService,
dsCache: dsCache,
accessControl: accessControl,
namespacer: namespacer,
}, nil
}
func (b *DataSourceAPIBuilder) GetGroupVersion() schema.GroupVersion {
return b.connectionResourceInfo.GroupVersion()
}
func addKnownTypes(scheme *runtime.Scheme, gv schema.GroupVersion) {
scheme.AddKnownTypes(gv,
&v0alpha1.DataSourceConnection{},
&v0alpha1.DataSourceConnectionList{},
&v0alpha1.HealthCheckResult{},
&unstructured.Unstructured{},
// Added for subresource stubs
&metav1.Status{},
)
}
func (b *DataSourceAPIBuilder) InstallSchema(scheme *runtime.Scheme) error {
gv := b.connectionResourceInfo.GroupVersion()
addKnownTypes(scheme, gv)
// Link this version to the internal representation.
// This is used for server-side-apply (PATCH), and avoids the error:
// "no kind is registered for the type"
addKnownTypes(scheme, schema.GroupVersion{
Group: gv.Group,
Version: runtime.APIVersionInternal,
})
// If multiple versions exist, then register conversions from zz_generated.conversion.go
// if err := playlist.RegisterConversions(scheme); err != nil {
// return err
// }
metav1.AddToGroupVersion(scheme, gv)
return scheme.SetVersionPriority(gv)
}
func (b *DataSourceAPIBuilder) GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
optsGetter generic.RESTOptionsGetter,
) (*genericapiserver.APIGroupInfo, error) {
storage := map[string]rest.Storage{}
conn := b.connectionResourceInfo
storage[conn.StoragePath()] = &connectionAccess{
builder: b,
resourceInfo: conn,
tableConverter: utils.NewTableConverter(
conn.GroupResource(),
[]metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name"},
{Name: "Title", Type: "string", Format: "string", Description: "The datasource title"},
{Name: "APIVersion", Type: "string", Format: "string", Description: "API Version"},
{Name: "Created At", Type: "date"},
},
func(obj any) ([]interface{}, error) {
m, ok := obj.(*v0alpha1.DataSourceConnection)
if !ok {
return nil, fmt.Errorf("expected connection")
}
return []interface{}{
m.Name,
m.Title,
m.APIVersion,
m.CreationTimestamp.UTC().Format(time.RFC3339),
}, nil
},
),
}
storage[conn.StoragePath("query")] = &subQueryREST{builder: b}
storage[conn.StoragePath("health")] = &subHealthREST{builder: b}
// TODO! only setup this endpoint if it is implemented
storage[conn.StoragePath("resource")] = &subResourceREST{builder: b}
// Frontend proxy
if len(b.plugin.Routes) > 0 {
storage[conn.StoragePath("proxy")] = &subProxyREST{builder: b}
}
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(
conn.GroupResource().Group, scheme,
metav1.ParameterCodec, codecs)
apiGroupInfo.VersionedResourcesStorageMap[conn.GroupVersion().Version] = storage
return &apiGroupInfo, nil
}
func (b *DataSourceAPIBuilder) GetOpenAPIDefinitions() common.GetOpenAPIDefinitions {
return v0alpha1.GetOpenAPIDefinitions
}
// Register additional routes with the server
func (b *DataSourceAPIBuilder) GetAPIRoutes() *grafanaapiserver.APIRoutes {
return nil
}
func (b *DataSourceAPIBuilder) getDataSourcePluginContext(ctx context.Context, name string) (*backend.PluginContext, error) {
info, err := request.NamespaceInfoFrom(ctx, true)
if err != nil {
return nil, err
}
user, err := appcontext.User(ctx)
if err != nil {
return nil, err
}
ds, err := b.dsCache.GetDatasourceByUID(ctx, name, user, false)
if err != nil {
return nil, err
}
settings := backend.DataSourceInstanceSettings{}
settings.ID = ds.ID
settings.UID = ds.UID
settings.Name = ds.Name
settings.URL = ds.URL
settings.Updated = ds.Updated
settings.User = ds.User
settings.JSONData, err = ds.JsonData.ToDB()
if err != nil {
return nil, err
}
settings.DecryptedSecureJSONData, err = b.dsService.DecryptedValues(ctx, ds)
if err != nil {
return nil, err
}
return &backend.PluginContext{
OrgID: info.OrgID,
PluginID: b.plugin.ID,
PluginVersion: b.plugin.Info.Version,
User: &backend.User{},
AppInstanceSettings: &backend.AppInstanceSettings{},
DataSourceInstanceSettings: &settings,
}, nil
}
func (b *DataSourceAPIBuilder) getDataSource(ctx context.Context, name string) (*datasources.DataSource, error) {
user, err := appcontext.User(ctx)
if err != nil {
return nil, err
}
return b.dsCache.GetDatasourceByUID(ctx, name, user, false)
}
func (b *DataSourceAPIBuilder) getDataSources(ctx context.Context) ([]*datasources.DataSource, error) {
orgId, err := request.OrgIDForList(ctx)
if err != nil {
return nil, err
}
return b.dsService.GetDataSourcesByType(ctx, &datasources.GetDataSourcesByTypeQuery{
OrgID: orgId,
Type: b.plugin.ID,
})
}

@ -0,0 +1,71 @@
package datasource
import (
"context"
"encoding/json"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
)
type subHealthREST struct {
builder *DataSourceAPIBuilder
}
var _ = rest.Connecter(&subHealthREST{})
func (r *subHealthREST) New() runtime.Object {
return &v0alpha1.HealthCheckResult{}
}
func (r *subHealthREST) Destroy() {
}
func (r *subHealthREST) ConnectMethods() []string {
return []string{"GET"}
}
func (r *subHealthREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
func (r *subHealthREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
pluginCtx, err := r.builder.getDataSourcePluginContext(ctx, name)
if err != nil {
responder.Error(err)
return
}
healthResponse, err := r.builder.client.CheckHealth(ctx, &backend.CheckHealthRequest{
PluginContext: *pluginCtx,
})
if err != nil {
responder.Error(err)
return
}
rsp := &v0alpha1.HealthCheckResult{}
rsp.Code = int(healthResponse.Status)
rsp.Status = healthResponse.Status.String()
rsp.Message = healthResponse.Message
if len(healthResponse.JSONDetails) > 0 {
err = json.Unmarshal(healthResponse.JSONDetails, &rsp.Details)
if err != nil {
responder.Error(err)
return
}
}
statusCode := http.StatusOK
if healthResponse.Status != backend.HealthStatusOk {
statusCode = http.StatusBadRequest
}
responder.Object(statusCode, rsp)
}), nil
}

@ -0,0 +1,51 @@
package datasource
import (
"context"
"fmt"
"net/http"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
)
type subProxyREST struct {
builder *DataSourceAPIBuilder
}
var _ = rest.Connecter(&subProxyREST{})
func (r *subProxyREST) New() runtime.Object {
return &metav1.Status{}
}
func (r *subProxyREST) Destroy() {}
func (r *subProxyREST) ConnectMethods() []string {
unique := map[string]bool{}
methods := []string{}
for _, r := range r.builder.plugin.Routes {
if unique[r.Method] {
continue
}
unique[r.Method] = true
methods = append(methods, r.Method)
}
return methods
}
func (r *subProxyREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, true, ""
}
func (r *subProxyREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
pluginCtx, err := r.builder.getDataSourcePluginContext(ctx, name)
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responder.Error(fmt.Errorf("TODO, proxy: " + pluginCtx.PluginID))
}), nil
}

@ -0,0 +1,103 @@
package datasource
import (
"context"
"encoding/json"
"io"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
)
type subQueryREST struct {
builder *DataSourceAPIBuilder
}
var _ = rest.Connecter(&subQueryREST{})
func (r *subQueryREST) New() runtime.Object {
return &metav1.Status{}
}
func (r *subQueryREST) Destroy() {
}
func (r *subQueryREST) ConnectMethods() []string {
return []string{"POST", "GET"}
}
func (r *subQueryREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
func (r *subQueryREST) readQueries(req *http.Request) ([]backend.DataQuery, error) {
// Simple URL to JSON mapping
if req.Method == http.MethodGet {
body := make(map[string]any, 0)
for k, v := range req.URL.Query() {
switch len(v) {
case 0:
body[k] = true
case 1:
body[k] = v[0] // TODO, convert numbers
default:
body[k] = v // TODO, convert numbers
}
}
var err error
dq := backend.DataQuery{
RefID: "A",
TimeRange: backend.TimeRange{
From: time.Now().Add(-1 * time.Hour), // last hour
To: time.Now(),
},
MaxDataPoints: 1000,
Interval: time.Second * 10,
}
dq.JSON, err = json.Marshal(body)
return []backend.DataQuery{dq}, err
}
body, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
return readQueries(body)
}
func (r *subQueryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
pluginCtx, err := r.builder.getDataSourcePluginContext(ctx, name)
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
queries, err := r.readQueries(req)
if err != nil {
responder.Error(err)
return
}
queryResponse, err := r.builder.client.QueryData(ctx, &backend.QueryDataRequest{
PluginContext: *pluginCtx,
Queries: queries,
// Headers: // from context
})
if err != nil {
return
}
jsonRsp, err := json.Marshal(queryResponse)
if err != nil {
responder.Error(err)
return
}
w.WriteHeader(200)
_, _ = w.Write(jsonRsp)
}), nil
}

@ -0,0 +1,79 @@
package datasource
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/plugins/httpresponsesender"
)
type subResourceREST struct {
builder *DataSourceAPIBuilder
}
var _ = rest.Connecter(&subResourceREST{})
func (r *subResourceREST) New() runtime.Object {
return &metav1.Status{}
}
func (r *subResourceREST) Destroy() {
}
func (r *subResourceREST) ConnectMethods() []string {
// All for now??? ideally we have a schema for resource and limit this
return []string{
http.MethodGet,
http.MethodHead,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
http.MethodOptions,
}
}
func (r *subResourceREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, true, ""
}
func (r *subResourceREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
pluginCtx, err := r.builder.getDataSourcePluginContext(ctx, name)
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
responder.Error(err)
return
}
idx := strings.LastIndex(req.URL.Path, "/resource")
if idx < 0 {
responder.Error(fmt.Errorf("expected resource path")) // 400?
return
}
path := req.URL.Path[idx+len("/resource"):]
err = r.builder.client.CallResource(ctx, &backend.CallResourceRequest{
PluginContext: *pluginCtx,
Path: path,
Method: req.Method,
Body: body,
}, httpresponsesender.New(w))
if err != nil {
responder.Error(err)
}
}), nil
}

@ -0,0 +1,25 @@
package datasource
import (
"fmt"
"strings"
)
func getDatasourceGroupNameFromPluginID(pluginId string) (string, error) {
if pluginId == "" {
return "", fmt.Errorf("bad pluginID (empty)")
}
parts := strings.Split(pluginId, "-")
if len(parts) == 1 {
return fmt.Sprintf("%s.datasource.grafana.app", parts[0]), nil
}
last := parts[len(parts)-1]
if last != "datasource" {
return "", fmt.Errorf("bad pluginID (%s)", pluginId)
}
if parts[0] == "grafana" {
parts = parts[1:] // strip the first value
}
return fmt.Sprintf("%s.datasource.grafana.app", strings.Join(parts[:len(parts)-1], "-")), nil
}

@ -0,0 +1,32 @@
package datasource
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestUtils(t *testing.T) {
// multiple flavors of the same idea
require.Equal(t, "tempo.datasource.grafana.app", getIDIgnoreError("tempo"))
require.Equal(t, "tempo.datasource.grafana.app", getIDIgnoreError("grafana-tempo-datasource"))
require.Equal(t, "tempo.datasource.grafana.app", getIDIgnoreError("tempo-datasource"))
// Multiple dashes in the name
require.Equal(t, "org-name.datasource.grafana.app", getIDIgnoreError("org-name-datasource"))
require.Equal(t, "org-name-more.datasource.grafana.app", getIDIgnoreError("org-name-more-datasource"))
require.Equal(t, "org-name-more-more.datasource.grafana.app", getIDIgnoreError("org-name-more-more-datasource"))
require.Error(t, getErrorIgnoreValue("graph-panel"))
require.Error(t, getErrorIgnoreValue("anything-notdatasource"))
}
func getIDIgnoreError(id string) string {
v, _ := getDatasourceGroupNameFromPluginID(id)
return v
}
func getErrorIgnoreValue(id string) error {
_, err := getDatasourceGroupNameFromPluginID(id)
return err
}

@ -3,6 +3,7 @@ package apiregistry
import ( import (
"github.com/google/wire" "github.com/google/wire"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
"github.com/grafana/grafana/pkg/registry/apis/example" "github.com/grafana/grafana/pkg/registry/apis/example"
"github.com/grafana/grafana/pkg/registry/apis/folders" "github.com/grafana/grafana/pkg/registry/apis/folders"
"github.com/grafana/grafana/pkg/registry/apis/playlist" "github.com/grafana/grafana/pkg/registry/apis/playlist"
@ -12,8 +13,8 @@ var WireSet = wire.NewSet(
ProvideRegistryServiceSink, // dummy background service that forces registration ProvideRegistryServiceSink, // dummy background service that forces registration
// Each must be added here *and* in the ServiceSink above // Each must be added here *and* in the ServiceSink above
// playlistV0.RegisterAPIService,
playlist.RegisterAPIService, playlist.RegisterAPIService,
example.RegisterAPIService, example.RegisterAPIService,
datasource.RegisterAPIService,
folders.RegisterAPIService, folders.RegisterAPIService,
) )

@ -0,0 +1,145 @@
package dashboards
import (
"context"
"testing"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/tests/apis"
"github.com/grafana/grafana/pkg/tests/testinfra"
)
func TestTestDatasource(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: false, // dev mode required for datasource connections
DisableAnonymous: true,
EnableFeatureToggles: []string{
featuremgmt.FlagGrafanaAPIServer,
featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs, // Required to start the example service
},
})
// Create a single datasource
ds := helper.CreateDS(&datasources.AddDataSourceCommand{
Name: "test",
Type: datasources.DS_TESTDATA,
UID: "test",
OrgID: int64(1),
})
require.Equal(t, "test", ds.UID)
t.Run("Check discovery client", func(t *testing.T) {
disco := helper.GetGroupVersionInfoJSON("testdata.datasource.grafana.app")
// fmt.Printf("%s", string(disco))
require.JSONEq(t, `[
{
"freshness": "Current",
"resources": [
{
"resource": "connections",
"responseKind": {
"group": "",
"kind": "DataSourceConnection",
"version": ""
},
"scope": "Namespaced",
"shortNames": [
"grafana-testdata-datasource-connection"
],
"singularResource": "connection",
"subresources": [
{
"responseKind": {
"group": "",
"kind": "HealthCheckResult",
"version": ""
},
"subresource": "health",
"verbs": [
"get"
]
},
{
"responseKind": {
"group": "",
"kind": "Status",
"version": ""
},
"subresource": "query",
"verbs": [
"create",
"get"
]
},
{
"responseKind": {
"group": "",
"kind": "Status",
"version": ""
},
"subresource": "resource",
"verbs": [
"create",
"delete",
"get",
"patch",
"update"
]
}
],
"verbs": [
"get",
"list"
]
}
],
"version": "v0alpha1"
}
]`, disco)
})
t.Run("Call subresources", func(t *testing.T) {
client := helper.Org1.Admin.Client.Resource(schema.GroupVersionResource{
Group: "testdata.datasource.grafana.app",
Version: "v0alpha1",
Resource: "connections",
}).Namespace("default")
ctx := context.Background()
list, err := client.List(ctx, metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, list.Items, 1, "expected a single connection")
require.Equal(t, "test", list.Items[0].GetName(), "with the test uid")
rsp, err := client.Get(ctx, "test", metav1.GetOptions{}, "health")
require.NoError(t, err)
body, err := rsp.MarshalJSON()
require.NoError(t, err)
//fmt.Printf("GOT: %v\n", string(body))
require.JSONEq(t, `{
"apiVersion": "testdata.datasource.grafana.app/v0alpha1",
"code": 1,
"kind": "HealthCheckResult",
"message": "Data source is working",
"status": "OK"
}
`, string(body))
// Test connecting to non-JSON marshaled data
raw := apis.DoRequest[any](helper, apis.RequestParams{
User: helper.Org1.Admin,
Method: "GET",
Path: "/apis/testdata.datasource.grafana.app/v0alpha1/namespaces/default/connections/test/resource",
}, nil)
require.Equal(t, `Hello world from test datasource!`, string(raw.Body))
})
}
Loading…
Cancel
Save