mirror of https://github.com/grafana/grafana
parent
12257d9dfe
commit
df72a61a52
@ -1,15 +0,0 @@ |
|||||||
This implements a ResourceServer backed by the existing dashboard SQL tables. |
|
||||||
|
|
||||||
There are a few oddities worth noting. This is not a totally accurate implementation, |
|
||||||
but it is good enough to drive the UI needs and let kubectl list work! |
|
||||||
|
|
||||||
1. The resourceVersion is the dashboard version |
|
||||||
- each resource starts at 1 and increases |
|
||||||
- there are duplicate resourceVersions! |
|
||||||
- the resourceVersion is never set on the list commands |
|
||||||
|
|
||||||
1. Results are always sorted by internal id ascending |
|
||||||
- this ensures everything is returned |
|
||||||
|
|
||||||
1. The history objects have createdTimestamp == updatedTimestamp |
|
||||||
- not real, but good enough |
|
@ -1,396 +0,0 @@ |
|||||||
package legacy |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"database/sql" |
|
||||||
"fmt" |
|
||||||
"path/filepath" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/identity" |
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
|
||||||
dashboardsV0 "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" |
|
||||||
"github.com/grafana/grafana/pkg/components/simplejson" |
|
||||||
"github.com/grafana/grafana/pkg/infra/appcontext" |
|
||||||
"github.com/grafana/grafana/pkg/infra/db" |
|
||||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" |
|
||||||
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils" |
|
||||||
"github.com/grafana/grafana/pkg/services/dashboards" |
|
||||||
"github.com/grafana/grafana/pkg/services/provisioning" |
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore/session" |
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
||||||
) |
|
||||||
|
|
||||||
var ( |
|
||||||
_ DashboardAccess = (*dashboardSqlAccess)(nil) |
|
||||||
) |
|
||||||
|
|
||||||
type dashboardRow struct { |
|
||||||
// The numeric version for this dashboard
|
|
||||||
Version int64 |
|
||||||
|
|
||||||
// Dashboard resource
|
|
||||||
Dash *dashboardsV0.Dashboard |
|
||||||
|
|
||||||
// The folder UID (needed for access control checks)
|
|
||||||
FolderUID string |
|
||||||
|
|
||||||
// Size (in bytes) of the dashboard payload
|
|
||||||
Bytes int |
|
||||||
|
|
||||||
// The token we can use that will start a new connection that includes
|
|
||||||
// this same dashboard
|
|
||||||
token *continueToken |
|
||||||
} |
|
||||||
|
|
||||||
type dashboardSqlAccess struct { |
|
||||||
sql db.DB |
|
||||||
sess *session.SessionDB |
|
||||||
namespacer request.NamespaceMapper |
|
||||||
dashStore dashboards.Store |
|
||||||
provisioning provisioning.ProvisioningService |
|
||||||
|
|
||||||
// Typically one... the server wrapper
|
|
||||||
subscribers []chan *resource.WrittenEvent |
|
||||||
mutex sync.Mutex |
|
||||||
} |
|
||||||
|
|
||||||
func NewDashboardAccess(sql db.DB, |
|
||||||
namespacer request.NamespaceMapper, |
|
||||||
dashStore dashboards.Store, |
|
||||||
provisioning provisioning.ProvisioningService, |
|
||||||
) DashboardAccess { |
|
||||||
return &dashboardSqlAccess{ |
|
||||||
sql: sql, |
|
||||||
sess: sql.GetSqlxSession(), |
|
||||||
namespacer: namespacer, |
|
||||||
dashStore: dashStore, |
|
||||||
provisioning: provisioning, |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
const selector = `SELECT |
|
||||||
dashboard.org_id, dashboard.id, |
|
||||||
dashboard.uid, dashboard.folder_uid, |
|
||||||
dashboard.created,dashboard.created_by,CreatedUSER.uid as created_uid, |
|
||||||
dashboard.updated,dashboard.updated_by,UpdatedUSER.uid as updated_uid, |
|
||||||
plugin_id, |
|
||||||
dashboard_provisioning.name as origin_name, |
|
||||||
dashboard_provisioning.external_id as origin_path, |
|
||||||
dashboard_provisioning.check_sum as origin_key, |
|
||||||
dashboard_provisioning.updated as origin_ts, |
|
||||||
dashboard.version, '', dashboard.data |
|
||||||
FROM dashboard |
|
||||||
LEFT OUTER JOIN dashboard_provisioning ON dashboard.id = dashboard_provisioning.dashboard_id |
|
||||||
LEFT OUTER JOIN user AS CreatedUSER ON dashboard.created_by = CreatedUSER.id |
|
||||||
LEFT OUTER JOIN user AS UpdatedUSER ON dashboard.created_by = UpdatedUSER.id |
|
||||||
WHERE dashboard.is_folder = false` |
|
||||||
|
|
||||||
const history = `SELECT |
|
||||||
dashboard.org_id, dashboard.id, |
|
||||||
dashboard.uid, dashboard.folder_uid, |
|
||||||
dashboard_version.created,dashboard_version.created_by,CreatedUSER.uid as created_uid, |
|
||||||
dashboard_version.created,dashboard_version.created_by,CreatedUSER.uid as updated_uid, |
|
||||||
plugin_id, |
|
||||||
dashboard_provisioning.name as origin_name, |
|
||||||
dashboard_provisioning.external_id as origin_path, |
|
||||||
dashboard_provisioning.check_sum as origin_key, |
|
||||||
dashboard_provisioning.updated as origin_ts, |
|
||||||
dashboard_version.version, dashboard_version.message, dashboard_version.data |
|
||||||
FROM dashboard |
|
||||||
LEFT OUTER JOIN dashboard_provisioning ON dashboard.id = dashboard_provisioning.dashboard_id |
|
||||||
LEFT OUTER JOIN dashboard_version ON dashboard.id = dashboard_version.dashboard_id |
|
||||||
LEFT OUTER JOIN user AS CreatedUSER ON dashboard_version.created_by = CreatedUSER.id |
|
||||||
WHERE dashboard.is_folder = false` |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, int, error) { |
|
||||||
if len(query.Labels) > 0 { |
|
||||||
return nil, 0, fmt.Errorf("labels not yet supported") |
|
||||||
// if query.Requirements.Folder != nil {
|
|
||||||
// args = append(args, *query.Requirements.Folder)
|
|
||||||
// sqlcmd = fmt.Sprintf("%s AND dashboard.folder_uid=$%d", sqlcmd, len(args))
|
|
||||||
// }
|
|
||||||
} |
|
||||||
|
|
||||||
var sqlcmd string |
|
||||||
args := []any{query.OrgID} |
|
||||||
|
|
||||||
limit := query.Limit |
|
||||||
if limit < 1 { |
|
||||||
limit = 15 //
|
|
||||||
} |
|
||||||
|
|
||||||
if query.FromHistory { |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", history, len(args)) |
|
||||||
|
|
||||||
if query.UID == "" { |
|
||||||
return nil, 0, fmt.Errorf("history query must have a UID") |
|
||||||
} |
|
||||||
|
|
||||||
args = append(args, query.UID) |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard.uid=$%d", sqlcmd, len(args)) |
|
||||||
|
|
||||||
if query.Version > 0 { |
|
||||||
args = append(args, query.Version) |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard_version.version=$%d", sqlcmd, len(args)) |
|
||||||
} else if query.MinID > 0 { |
|
||||||
args = append(args, query.MinID) |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard_version.version<$%d", sqlcmd, len(args)) |
|
||||||
} |
|
||||||
|
|
||||||
args = append(args, (limit + 2)) // add more so we can include a next token
|
|
||||||
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard_version.version desc LIMIT $%d", sqlcmd, len(args)) |
|
||||||
} else { |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d\n ", selector, len(args)) |
|
||||||
|
|
||||||
if query.UID != "" { |
|
||||||
args = append(args, query.UID) |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard.uid=$%d", sqlcmd, len(args)) |
|
||||||
} else if query.MinID > 0 { |
|
||||||
args = append(args, query.MinID) |
|
||||||
sqlcmd = fmt.Sprintf("%s AND dashboard.id>=$%d", sqlcmd, len(args)) |
|
||||||
} |
|
||||||
|
|
||||||
args = append(args, (limit + 2)) // add more so we can include a next token
|
|
||||||
sqlcmd = fmt.Sprintf("%s\n ORDER BY dashboard.id asc LIMIT $%d", sqlcmd, len(args)) |
|
||||||
} |
|
||||||
|
|
||||||
fmt.Printf("%s // %v\n", sqlcmd, args) |
|
||||||
|
|
||||||
rows, err := a.doQuery(ctx, sqlcmd, args...) |
|
||||||
if err != nil { |
|
||||||
if rows != nil { |
|
||||||
_ = rows.Close() |
|
||||||
} |
|
||||||
rows = nil |
|
||||||
} |
|
||||||
return rows, limit, err |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...any) (*rowsWrapper, error) { |
|
||||||
_, err := identity.GetRequester(ctx) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
rows, err := a.sess.Query(ctx, query, args...) |
|
||||||
return &rowsWrapper{ |
|
||||||
rows: rows, |
|
||||||
a: a, |
|
||||||
// This looks up rules from the permissions on a user
|
|
||||||
canReadDashboard: func(scopes ...string) bool { |
|
||||||
return true // ???
|
|
||||||
}, |
|
||||||
// accesscontrol.Checker(user, dashboards.ActionDashboardsRead),
|
|
||||||
}, err |
|
||||||
} |
|
||||||
|
|
||||||
type rowsWrapper struct { |
|
||||||
a *dashboardSqlAccess |
|
||||||
rows *sql.Rows |
|
||||||
idx int |
|
||||||
total int64 |
|
||||||
|
|
||||||
canReadDashboard func(scopes ...string) bool |
|
||||||
} |
|
||||||
|
|
||||||
func (r *rowsWrapper) Close() error { |
|
||||||
return r.rows.Close() |
|
||||||
} |
|
||||||
|
|
||||||
func (r *rowsWrapper) Next() (*dashboardRow, error) { |
|
||||||
// breaks after first readable value
|
|
||||||
for r.rows.Next() { |
|
||||||
r.idx++ |
|
||||||
d, err := r.a.scanRow(r.rows) |
|
||||||
if d != nil { |
|
||||||
// Access control checker
|
|
||||||
scopes := []string{dashboards.ScopeDashboardsProvider.GetResourceScopeUID(d.Dash.Name)} |
|
||||||
if d.FolderUID != "" { // Copied from searchV2... not sure the logic is right
|
|
||||||
scopes = append(scopes, dashboards.ScopeFoldersProvider.GetResourceScopeUID(d.FolderUID)) |
|
||||||
} |
|
||||||
if !r.canReadDashboard(scopes...) { |
|
||||||
continue |
|
||||||
} |
|
||||||
d.token.bytes = r.total // size before next!
|
|
||||||
r.total += int64(d.Bytes) |
|
||||||
} |
|
||||||
|
|
||||||
// returns the first folder it can
|
|
||||||
return d, err |
|
||||||
} |
|
||||||
return nil, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) { |
|
||||||
dash := &dashboardsV0.Dashboard{ |
|
||||||
TypeMeta: dashboardsV0.DashboardResourceInfo.TypeMeta(), |
|
||||||
ObjectMeta: metav1.ObjectMeta{Annotations: make(map[string]string)}, |
|
||||||
} |
|
||||||
row := &dashboardRow{Dash: dash} |
|
||||||
|
|
||||||
var dashboard_id int64 |
|
||||||
var orgId int64 |
|
||||||
var folder_uid sql.NullString |
|
||||||
var updated time.Time |
|
||||||
var updatedByID int64 |
|
||||||
var updatedByUID sql.NullString |
|
||||||
|
|
||||||
var created time.Time |
|
||||||
var createdByID int64 |
|
||||||
var createdByUID sql.NullString |
|
||||||
var message sql.NullString |
|
||||||
|
|
||||||
var plugin_id string |
|
||||||
var origin_name sql.NullString |
|
||||||
var origin_path sql.NullString |
|
||||||
var origin_ts sql.NullInt64 |
|
||||||
var origin_hash sql.NullString |
|
||||||
var data []byte // the dashboard JSON
|
|
||||||
var version int64 |
|
||||||
|
|
||||||
err := rows.Scan(&orgId, &dashboard_id, &dash.Name, &folder_uid, |
|
||||||
&created, &createdByID, &createdByUID, |
|
||||||
&updated, &updatedByID, &updatedByUID, |
|
||||||
&plugin_id, |
|
||||||
&origin_name, &origin_path, &origin_hash, &origin_ts, |
|
||||||
&version, &message, &data, |
|
||||||
) |
|
||||||
|
|
||||||
row.token = &continueToken{orgId: orgId, id: dashboard_id} |
|
||||||
if err == nil { |
|
||||||
row.Version = version |
|
||||||
dash.ResourceVersion = fmt.Sprintf("%d", version) |
|
||||||
dash.Namespace = a.namespacer(orgId) |
|
||||||
dash.UID = gapiutil.CalculateClusterWideUID(dash) |
|
||||||
dash.SetCreationTimestamp(metav1.NewTime(created)) |
|
||||||
meta, err := utils.MetaAccessor(dash) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
meta.SetUpdatedTimestamp(&updated) |
|
||||||
if createdByID > 0 { |
|
||||||
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceUser, createdByID).String()) |
|
||||||
} else if createdByID < 0 { |
|
||||||
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceProvisioning, 0).String()) |
|
||||||
} |
|
||||||
if updatedByID > 0 { |
|
||||||
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceUser, updatedByID).String()) |
|
||||||
} else if updatedByID < 0 { |
|
||||||
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceProvisioning, 0).String()) |
|
||||||
} |
|
||||||
if message.Valid && message.String != "" { |
|
||||||
meta.SetMessage(message.String) |
|
||||||
} |
|
||||||
if folder_uid.Valid { |
|
||||||
meta.SetFolder(folder_uid.String) |
|
||||||
row.FolderUID = folder_uid.String |
|
||||||
} |
|
||||||
|
|
||||||
if origin_name.Valid { |
|
||||||
ts := time.Unix(origin_ts.Int64, 0) |
|
||||||
|
|
||||||
resolvedPath := a.provisioning.GetDashboardProvisionerResolvedPath(origin_name.String) |
|
||||||
originPath, err := filepath.Rel( |
|
||||||
resolvedPath, |
|
||||||
origin_path.String, |
|
||||||
) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
meta.SetOriginInfo(&utils.ResourceOriginInfo{ |
|
||||||
Name: origin_name.String, |
|
||||||
Path: originPath, |
|
||||||
Hash: origin_hash.String, |
|
||||||
Timestamp: &ts, |
|
||||||
}) |
|
||||||
} else if plugin_id != "" { |
|
||||||
meta.SetOriginInfo(&utils.ResourceOriginInfo{ |
|
||||||
Name: "plugin", |
|
||||||
Path: plugin_id, |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
row.Bytes = len(data) |
|
||||||
if row.Bytes > 0 { |
|
||||||
err = dash.Spec.UnmarshalJSON(data) |
|
||||||
if err != nil { |
|
||||||
return row, err |
|
||||||
} |
|
||||||
dash.Spec.Set("id", dashboard_id) // add it so we can get it from the body later
|
|
||||||
} |
|
||||||
} |
|
||||||
return row, err |
|
||||||
} |
|
||||||
|
|
||||||
// DeleteDashboard implements DashboardAccess.
|
|
||||||
func (a *dashboardSqlAccess) DeleteDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, bool, error) { |
|
||||||
dash, _, err := a.GetDashboard(ctx, orgId, uid) |
|
||||||
if err != nil { |
|
||||||
return nil, false, err |
|
||||||
} |
|
||||||
|
|
||||||
id := dash.Spec.GetNestedInt64("id") |
|
||||||
if id == 0 { |
|
||||||
return nil, false, fmt.Errorf("could not find id in saved body") |
|
||||||
} |
|
||||||
|
|
||||||
err = a.dashStore.DeleteDashboard(ctx, &dashboards.DeleteDashboardCommand{ |
|
||||||
OrgID: orgId, |
|
||||||
ID: id, |
|
||||||
}) |
|
||||||
if err != nil { |
|
||||||
return nil, false, err |
|
||||||
} |
|
||||||
return dash, true, nil |
|
||||||
} |
|
||||||
|
|
||||||
// SaveDashboard implements DashboardAccess.
|
|
||||||
func (a *dashboardSqlAccess) SaveDashboard(ctx context.Context, orgId int64, dash *dashboardsV0.Dashboard) (*dashboardsV0.Dashboard, bool, error) { |
|
||||||
created := false |
|
||||||
user, err := appcontext.User(ctx) |
|
||||||
if err != nil { |
|
||||||
return nil, created, err |
|
||||||
} |
|
||||||
if dash.Name != "" { |
|
||||||
dash.Spec.Set("uid", dash.Name) |
|
||||||
|
|
||||||
// Get the previous version to set the internal ID
|
|
||||||
old, _ := a.dashStore.GetDashboard(ctx, &dashboards.GetDashboardQuery{ |
|
||||||
OrgID: orgId, |
|
||||||
UID: dash.Name, |
|
||||||
}) |
|
||||||
if old != nil { |
|
||||||
dash.Spec.Set("id", old.ID) |
|
||||||
} else { |
|
||||||
dash.Spec.Remove("id") // existing of "id" makes it an update
|
|
||||||
created = true |
|
||||||
} |
|
||||||
} else { |
|
||||||
dash.Spec.Remove("id") |
|
||||||
dash.Spec.Remove("uid") |
|
||||||
} |
|
||||||
|
|
||||||
meta, err := utils.MetaAccessor(dash) |
|
||||||
if err != nil { |
|
||||||
return nil, false, err |
|
||||||
} |
|
||||||
out, err := a.dashStore.SaveDashboard(ctx, dashboards.SaveDashboardCommand{ |
|
||||||
OrgID: orgId, |
|
||||||
Dashboard: simplejson.NewFromAny(dash.Spec.UnstructuredContent()), |
|
||||||
FolderUID: meta.GetFolder(), |
|
||||||
Overwrite: true, // already passed the revisionVersion checks!
|
|
||||||
UserID: user.UserID, |
|
||||||
}) |
|
||||||
if err != nil { |
|
||||||
return nil, false, err |
|
||||||
} |
|
||||||
if out != nil { |
|
||||||
created = (out.Created.Unix() == out.Updated.Unix()) // and now?
|
|
||||||
} |
|
||||||
dash, _, err = a.GetDashboard(ctx, orgId, out.UID) |
|
||||||
return dash, created, err |
|
||||||
} |
|
@ -1,342 +0,0 @@ |
|||||||
package legacy |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"encoding/json" |
|
||||||
"fmt" |
|
||||||
"time" |
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/apimachinery/utils" |
|
||||||
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" |
|
||||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request" |
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
||||||
) |
|
||||||
|
|
||||||
func getDashboardFromEvent(event resource.WriteEvent) (*dashboard.Dashboard, error) { |
|
||||||
obj, ok := event.Object.GetRuntimeObject() |
|
||||||
if ok && obj != nil { |
|
||||||
dash, ok := obj.(*dashboard.Dashboard) |
|
||||||
if ok { |
|
||||||
return dash, nil |
|
||||||
} |
|
||||||
} |
|
||||||
dash := &dashboard.Dashboard{} |
|
||||||
err := json.Unmarshal(event.Value, dash) |
|
||||||
return dash, err |
|
||||||
} |
|
||||||
|
|
||||||
func isDashboardKey(key *resource.ResourceKey, requireName bool) error { |
|
||||||
gr := dashboard.DashboardResourceInfo.GroupResource() |
|
||||||
if key.Group != gr.Group { |
|
||||||
return fmt.Errorf("expecting dashboard group (%s != %s)", key.Group, gr.Group) |
|
||||||
} |
|
||||||
if key.Resource != gr.Resource { |
|
||||||
return fmt.Errorf("expecting dashboard resource (%s != %s)", key.Resource, gr.Resource) |
|
||||||
} |
|
||||||
if requireName && key.Name == "" { |
|
||||||
return fmt.Errorf("expecting dashboard name (uid)") |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) { |
|
||||||
info, err := request.ParseNamespace(event.Key.Namespace) |
|
||||||
if err == nil { |
|
||||||
err = isDashboardKey(event.Key, true) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return 0, err |
|
||||||
} |
|
||||||
|
|
||||||
switch event.Type { |
|
||||||
case resource.WatchEvent_DELETED: |
|
||||||
{ |
|
||||||
_, _, err = a.DeleteDashboard(ctx, info.OrgID, event.Key.Name) |
|
||||||
//rv = ???
|
|
||||||
} |
|
||||||
// The difference depends on embedded internal ID
|
|
||||||
case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED: |
|
||||||
{ |
|
||||||
dash, err := getDashboardFromEvent(event) |
|
||||||
if err != nil { |
|
||||||
return 0, err |
|
||||||
} |
|
||||||
|
|
||||||
after, _, err := a.SaveDashboard(ctx, info.OrgID, dash) |
|
||||||
if err != nil { |
|
||||||
return 0, err |
|
||||||
} |
|
||||||
if after != nil { |
|
||||||
meta, err := utils.MetaAccessor(after) |
|
||||||
if err != nil { |
|
||||||
return 0, err |
|
||||||
} |
|
||||||
rv, err = meta.GetResourceVersionInt64() |
|
||||||
if err != nil { |
|
||||||
return 0, err |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
default: |
|
||||||
return 0, fmt.Errorf("unsupported event type: %v", event.Type) |
|
||||||
} |
|
||||||
|
|
||||||
// Async notify all subscribers (not HA!!!)
|
|
||||||
if a.subscribers != nil { |
|
||||||
go func() { |
|
||||||
write := &resource.WrittenEvent{ |
|
||||||
WriteEvent: event, |
|
||||||
|
|
||||||
Timestamp: time.Now().UnixMilli(), |
|
||||||
ResourceVersion: rv, |
|
||||||
} |
|
||||||
for _, sub := range a.subscribers { |
|
||||||
sub <- write |
|
||||||
} |
|
||||||
}() |
|
||||||
} |
|
||||||
return rv, err |
|
||||||
} |
|
||||||
|
|
||||||
// Read implements ResourceStoreServer.
|
|
||||||
func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboard.Dashboard, int64, error) { |
|
||||||
rows, _, err := a.getRows(ctx, &DashboardQuery{ |
|
||||||
OrgID: orgId, |
|
||||||
UID: uid, |
|
||||||
Limit: 100, // will only be one!
|
|
||||||
}) |
|
||||||
if err != nil { |
|
||||||
return nil, 0, err |
|
||||||
} |
|
||||||
defer func() { _ = rows.Close() }() |
|
||||||
|
|
||||||
row, err := rows.Next() |
|
||||||
if err != nil || row == nil { |
|
||||||
return nil, 0, err |
|
||||||
} |
|
||||||
return row.Dash, row.Version, nil |
|
||||||
} |
|
||||||
|
|
||||||
// Read implements ResourceStoreServer.
|
|
||||||
func (a *dashboardSqlAccess) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { |
|
||||||
info, err := request.ParseNamespace(req.Key.Namespace) |
|
||||||
if err == nil { |
|
||||||
err = isDashboardKey(req.Key, true) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if req.ResourceVersion > 0 { |
|
||||||
return nil, fmt.Errorf("reading from history not yet supported") |
|
||||||
} |
|
||||||
|
|
||||||
dash, rv, err := a.GetDashboard(ctx, info.OrgID, req.Key.Name) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
value, err := json.Marshal(dash) |
|
||||||
return &resource.ReadResponse{ |
|
||||||
ResourceVersion: rv, |
|
||||||
Value: value, |
|
||||||
}, err |
|
||||||
} |
|
||||||
|
|
||||||
// List implements AppendingStore.
|
|
||||||
func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { |
|
||||||
opts := req.Options |
|
||||||
info, err := request.ParseNamespace(opts.Key.Namespace) |
|
||||||
if err == nil { |
|
||||||
err = isDashboardKey(opts.Key, false) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
token, err := readContinueToken(req.NextPageToken) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if token.orgId > 0 && token.orgId != info.OrgID { |
|
||||||
return nil, fmt.Errorf("token and orgID mismatch") |
|
||||||
} |
|
||||||
|
|
||||||
query := &DashboardQuery{ |
|
||||||
OrgID: info.OrgID, |
|
||||||
Limit: int(req.Limit), |
|
||||||
MaxBytes: 2 * 1024 * 1024, // 2MB,
|
|
||||||
MinID: token.id, |
|
||||||
Labels: req.Options.Labels, |
|
||||||
} |
|
||||||
|
|
||||||
rows, limit, err := a.getRows(ctx, query) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
defer func() { _ = rows.Close() }() |
|
||||||
|
|
||||||
totalSize := 0 |
|
||||||
list := &resource.ListResponse{} |
|
||||||
for { |
|
||||||
row, err := rows.Next() |
|
||||||
if err != nil || row == nil { |
|
||||||
return list, err |
|
||||||
} |
|
||||||
|
|
||||||
totalSize += row.Bytes |
|
||||||
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) { |
|
||||||
// if query.Requirements.Folder != nil {
|
|
||||||
// row.token.folder = *query.Requirements.Folder
|
|
||||||
// }
|
|
||||||
list.NextPageToken = row.token.String() // will skip this one but start here next time
|
|
||||||
return list, err |
|
||||||
} |
|
||||||
// TODO -- make it smaller and stick the body as an annotation...
|
|
||||||
val, err := json.Marshal(row.Dash) |
|
||||||
if err != nil { |
|
||||||
return list, err |
|
||||||
} |
|
||||||
list.Items = append(list.Items, &resource.ResourceWrapper{ |
|
||||||
ResourceVersion: row.Version, |
|
||||||
Value: val, |
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Watch implements AppendingStore.
|
|
||||||
func (a *dashboardSqlAccess) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) { |
|
||||||
stream := make(chan *resource.WrittenEvent, 10) |
|
||||||
{ |
|
||||||
a.mutex.Lock() |
|
||||||
defer a.mutex.Unlock() |
|
||||||
|
|
||||||
// Add the event stream
|
|
||||||
a.subscribers = append(a.subscribers, stream) |
|
||||||
} |
|
||||||
|
|
||||||
// Wait for context done
|
|
||||||
go func() { |
|
||||||
// Wait till the context is done
|
|
||||||
<-ctx.Done() |
|
||||||
|
|
||||||
// Then remove the subscription
|
|
||||||
a.mutex.Lock() |
|
||||||
defer a.mutex.Unlock() |
|
||||||
|
|
||||||
// Copy all streams without our listener
|
|
||||||
subs := []chan *resource.WrittenEvent{} |
|
||||||
for _, sub := range a.subscribers { |
|
||||||
if sub != stream { |
|
||||||
subs = append(subs, sub) |
|
||||||
} |
|
||||||
} |
|
||||||
a.subscribers = subs |
|
||||||
}() |
|
||||||
return stream, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) SupportsSignedURLs() bool { |
|
||||||
return false |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) PutBlob(context.Context, *resource.PutBlobRequest) (*resource.PutBlobResponse, error) { |
|
||||||
return nil, fmt.Errorf("put blob not implemented yet") |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) GetBlob(ctx context.Context, key *resource.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resource.GetBlobResponse, error) { |
|
||||||
ns, err := request.ParseNamespace(key.Namespace) |
|
||||||
if err == nil { |
|
||||||
err = isDashboardKey(key, true) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
dash, _, err := a.GetDashboard(ctx, ns.OrgID, key.Name) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
rsp := &resource.GetBlobResponse{ |
|
||||||
ContentType: "application/json", |
|
||||||
} |
|
||||||
rsp.Value, err = json.Marshal(dash.Spec) |
|
||||||
return rsp, err |
|
||||||
} |
|
||||||
|
|
||||||
func (a *dashboardSqlAccess) History(ctx context.Context, req *resource.HistoryRequest) (*resource.HistoryResponse, error) { |
|
||||||
info, err := request.ParseNamespace(req.Key.Namespace) |
|
||||||
if err == nil { |
|
||||||
err = isDashboardKey(req.Key, false) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
token, err := readContinueToken(req.NextPageToken) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if token.orgId > 0 && token.orgId != info.OrgID { |
|
||||||
return nil, fmt.Errorf("token and orgID mismatch") |
|
||||||
} |
|
||||||
|
|
||||||
query := &DashboardQuery{ |
|
||||||
OrgID: info.OrgID, |
|
||||||
Limit: int(req.Limit), |
|
||||||
MaxBytes: 2 * 1024 * 1024, // 2MB,
|
|
||||||
MinID: token.id, |
|
||||||
FromHistory: true, |
|
||||||
UID: req.Key.Name, |
|
||||||
} |
|
||||||
|
|
||||||
rows, limit, err := a.getRows(ctx, query) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
defer func() { _ = rows.Close() }() |
|
||||||
|
|
||||||
totalSize := 0 |
|
||||||
list := &resource.HistoryResponse{} |
|
||||||
for { |
|
||||||
row, err := rows.Next() |
|
||||||
if err != nil || row == nil { |
|
||||||
return list, err |
|
||||||
} |
|
||||||
|
|
||||||
totalSize += row.Bytes |
|
||||||
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) { |
|
||||||
// if query.Requirements.Folder != nil {
|
|
||||||
// row.token.folder = *query.Requirements.Folder
|
|
||||||
// }
|
|
||||||
row.token.id = row.Version // Use the version as the increment
|
|
||||||
list.NextPageToken = row.token.String() // will skip this one but start here next time
|
|
||||||
return list, err |
|
||||||
} |
|
||||||
|
|
||||||
partial := &metav1.PartialObjectMetadata{ |
|
||||||
ObjectMeta: row.Dash.ObjectMeta, |
|
||||||
} |
|
||||||
partial.UID = "" // it is not useful/helpful/accurate and just confusing now
|
|
||||||
|
|
||||||
val, err := json.Marshal(partial) |
|
||||||
if err != nil { |
|
||||||
return list, err |
|
||||||
} |
|
||||||
full, err := json.Marshal(row.Dash.Spec) |
|
||||||
if err != nil { |
|
||||||
return list, err |
|
||||||
} |
|
||||||
list.Items = append(list.Items, &resource.ResourceMeta{ |
|
||||||
ResourceVersion: row.Version, |
|
||||||
PartialObjectMeta: val, |
|
||||||
Size: int32(len(full)), |
|
||||||
Hash: "??", // hash the full?
|
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Used for efficient provisioning
|
|
||||||
func (a *dashboardSqlAccess) Origin(context.Context, *resource.OriginRequest) (*resource.OriginResponse, error) { |
|
||||||
return nil, fmt.Errorf("not yet (origin)") |
|
||||||
} |
|
@ -1,67 +0,0 @@ |
|||||||
package legacy |
|
||||||
|
|
||||||
import ( |
|
||||||
"fmt" |
|
||||||
"strconv" |
|
||||||
"strings" |
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/util" |
|
||||||
) |
|
||||||
|
|
||||||
type continueToken struct { |
|
||||||
orgId int64 |
|
||||||
id int64 // the internal id (sort by!)
|
|
||||||
folder string // from the query
|
|
||||||
bytes int64 // information, not a query
|
|
||||||
} |
|
||||||
|
|
||||||
func readContinueToken(next string) (continueToken, error) { |
|
||||||
var err error |
|
||||||
token := continueToken{} |
|
||||||
if next == "" { |
|
||||||
return token, nil |
|
||||||
} |
|
||||||
parts := strings.Split(next, "/") |
|
||||||
if len(parts) < 3 { |
|
||||||
return token, fmt.Errorf("invalid continue token (too few parts)") |
|
||||||
} |
|
||||||
sub := strings.Split(parts[0], ":") |
|
||||||
if sub[0] != "org" { |
|
||||||
return token, fmt.Errorf("expected org in first slug") |
|
||||||
} |
|
||||||
token.orgId, err = strconv.ParseInt(sub[1], 10, 64) |
|
||||||
if err != nil { |
|
||||||
return token, fmt.Errorf("error parsing orgid") |
|
||||||
} |
|
||||||
|
|
||||||
sub = strings.Split(parts[1], ":") |
|
||||||
if sub[0] != "start" { |
|
||||||
return token, fmt.Errorf("expected internal ID in second slug") |
|
||||||
} |
|
||||||
token.id, err = strconv.ParseInt(sub[1], 10, 64) |
|
||||||
if err != nil { |
|
||||||
return token, fmt.Errorf("error parsing updated") |
|
||||||
} |
|
||||||
|
|
||||||
sub = strings.Split(parts[2], ":") |
|
||||||
if sub[0] != "folder" { |
|
||||||
return token, fmt.Errorf("expected folder UID in third slug") |
|
||||||
} |
|
||||||
token.folder = sub[1] |
|
||||||
|
|
||||||
// // Check if the folder filter is the same from the previous query
|
|
||||||
// if q.Requirements.Folder == nil {
|
|
||||||
// if token.folder != "" {
|
|
||||||
// return token, fmt.Errorf("invalid token, the folder must match previous query")
|
|
||||||
// }
|
|
||||||
// } else if token.folder != *q.Requirements.Folder {
|
|
||||||
// return token, fmt.Errorf("invalid token, the folder must match previous query")
|
|
||||||
// }
|
|
||||||
|
|
||||||
return token, err |
|
||||||
} |
|
||||||
|
|
||||||
func (r *continueToken) String() string { |
|
||||||
return fmt.Sprintf("org:%d/start:%d/folder:%s/%s", |
|
||||||
r.orgId, r.id, r.folder, util.ByteCountSI(r.bytes)) |
|
||||||
} |
|
@ -1,34 +0,0 @@ |
|||||||
package legacy |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
|
|
||||||
dashboardsV0 "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1" |
|
||||||
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
||||||
) |
|
||||||
|
|
||||||
// This does not check if you have permissions!
|
|
||||||
|
|
||||||
type DashboardQuery struct { |
|
||||||
OrgID int64 |
|
||||||
UID string // to select a single dashboard
|
|
||||||
Limit int |
|
||||||
MaxBytes int |
|
||||||
MinID int64 // from continue token
|
|
||||||
|
|
||||||
FromHistory bool |
|
||||||
Version int64 |
|
||||||
|
|
||||||
// The label requirements
|
|
||||||
Labels []*resource.Requirement |
|
||||||
} |
|
||||||
|
|
||||||
type DashboardAccess interface { |
|
||||||
resource.StorageBackend |
|
||||||
resource.BlobStore |
|
||||||
resource.ResourceIndexServer |
|
||||||
|
|
||||||
GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, int64, error) |
|
||||||
SaveDashboard(ctx context.Context, orgId int64, dash *dashboardsV0.Dashboard) (*dashboardsV0.Dashboard, bool, error) |
|
||||||
DeleteDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, bool, error) |
|
||||||
} |
|
Loading…
Reference in new issue