Storage: include SQL implementation (#58018)

pull/58279/head
Ryan McKinley 3 years ago committed by GitHub
parent 978f1119d7
commit eb1cc80941
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/server/wire.go
  2. 65
      pkg/services/sqlstore/migrations/object_store_mig.go
  3. 1
      pkg/services/sqlstore/migrator/column.go
  4. 6
      pkg/services/sqlstore/migrator/mysql_dialect.go
  5. 1
      pkg/services/sqlstore/sqlstore.go
  6. 78
      pkg/services/store/object/sqlstash/querybuilder.go
  7. 731
      pkg/services/store/object/sqlstash/sql_storage_server.go
  8. 96
      pkg/services/store/object/sqlstash/summary_handler.go
  9. 27
      pkg/services/store/object/sqlstash/utils.go

@ -126,8 +126,8 @@ import (
"github.com/grafana/grafana/pkg/services/star/starimpl"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/kind"
objectdummyserver "github.com/grafana/grafana/pkg/services/store/object/dummy"
"github.com/grafana/grafana/pkg/services/store/object/httpobjectstore"
"github.com/grafana/grafana/pkg/services/store/object/sqlstash"
"github.com/grafana/grafana/pkg/services/store/resolver"
"github.com/grafana/grafana/pkg/services/store/sanitizer"
"github.com/grafana/grafana/pkg/services/tag"
@ -360,8 +360,8 @@ var wireBasicSet = wire.NewSet(
grpcserver.ProvideHealthService,
grpcserver.ProvideReflectionService,
interceptors.ProvideAuthenticator,
kind.ProvideService, // The registry known kinds
objectdummyserver.ProvideDummyObjectServer,
kind.ProvideService, // The registry of known kinds
sqlstash.ProvideSQLObjectServer,
resolver.ProvideObjectReferenceResolver,
httpobjectstore.ProvideHTTPObjectStore,
teamimpl.ProvideService,

@ -8,36 +8,47 @@ import (
"github.com/grafana/grafana/pkg/setting"
)
func getKeyColumn(name string, isPrimaryKey bool) *migrator.Column {
return &migrator.Column{
Name: name,
Type: migrator.DB_NVarchar,
Length: 1024,
Nullable: false,
IsPrimaryKey: isPrimaryKey,
IsLatin: true, // only used in MySQL
}
}
func addObjectStorageMigrations(mg *migrator.Migrator) {
tables := []migrator.Table{}
tables = append(tables, migrator.Table{
Name: "object",
Columns: []*migrator.Column{
// Object key contains everything required to make it unique across all instances
// orgId+scope+kind+uid
{Name: "key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false, IsPrimaryKey: true},
// Object path contains everything required to make it unique across all instances
// orgId + scope + kind + uid
getKeyColumn("path", true),
// This is an optimization for listing everything at the same level in the object store
{Name: "parent_folder_key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
getKeyColumn("parent_folder_path", false),
// The object type
{Name: "kind", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
// The raw object body (any byte array)
{Name: "body", Type: migrator.DB_Blob, Nullable: false},
{Name: "body", Type: migrator.DB_LongBlob, Nullable: false},
{Name: "size", Type: migrator.DB_BigInt, Nullable: false},
{Name: "etag", Type: migrator.DB_NVarchar, Length: 32, Nullable: false}, // md5(body)
{Name: "etag", Type: migrator.DB_NVarchar, Length: 32, Nullable: false, IsLatin: true}, // md5(body)
{Name: "version", Type: migrator.DB_NVarchar, Length: 128, Nullable: false},
// Who changed what when -- We should avoid JOINs with other tables in the database
{Name: "updated", Type: migrator.DB_DateTime, Nullable: false},
{Name: "created", Type: migrator.DB_DateTime, Nullable: false},
{Name: "updated_at", Type: migrator.DB_BigInt, Nullable: false},
{Name: "created_at", Type: migrator.DB_BigInt, Nullable: false},
{Name: "updated_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "created_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
// For objects that are synchronized from an external source (ie provisioning or git)
{Name: "sync_src", Type: migrator.DB_Text, Nullable: true},
{Name: "sync_time", Type: migrator.DB_DateTime, Nullable: true},
{Name: "sync_time", Type: migrator.DB_BigInt, Nullable: true},
// Summary data (always extracted from the `body` column)
{Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
@ -46,22 +57,21 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
{Name: "fields", Type: migrator.DB_Text, Nullable: true}, // JSON object
{Name: "errors", Type: migrator.DB_Text, Nullable: true}, // JSON object
},
PrimaryKeys: []string{"key"},
Indices: []*migrator.Index{
{Cols: []string{"parent_folder_key"}}, // list in folder
{Cols: []string{"kind"}}, // filter by type
{Cols: []string{"parent_folder_path"}}, // list in folder
{Cols: []string{"kind"}}, // filter by type
},
})
tables = append(tables, migrator.Table{
Name: "object_labels",
Columns: []*migrator.Column{
{Name: "key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
getKeyColumn("path", false),
{Name: "label", Type: migrator.DB_NVarchar, Length: 191, Nullable: false},
{Name: "value", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"key", "label"}, Type: migrator.UniqueIndex},
{Cols: []string{"path", "label"}, Type: migrator.UniqueIndex},
},
})
@ -69,7 +79,7 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
Name: "object_ref",
Columns: []*migrator.Column{
// Source:
{Name: "key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
getKeyColumn("path", false),
// Address (defined in the body, not resolved, may be invalid and change)
{Name: "kind", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
@ -78,12 +88,12 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
// Runtime calcs (will depend on the system state)
{Name: "resolved_ok", Type: migrator.DB_Bool, Nullable: false},
{Name: "resolved_to", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
getKeyColumn("resolved_to", false),
{Name: "resolved_warning", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
{Name: "resolved_time", Type: migrator.DB_DateTime, Nullable: false}, // resolution cache timestamp
},
Indices: []*migrator.Index{
{Cols: []string{"key"}, Type: migrator.IndexType},
{Cols: []string{"path"}, Type: migrator.IndexType},
{Cols: []string{"kind"}, Type: migrator.IndexType},
{Cols: []string{"resolved_to"}, Type: migrator.IndexType},
},
@ -92,23 +102,23 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
tables = append(tables, migrator.Table{
Name: "object_history",
Columns: []*migrator.Column{
{Name: "key", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false},
getKeyColumn("path", false),
{Name: "version", Type: migrator.DB_NVarchar, Length: 128, Nullable: false},
// Raw bytes
{Name: "body", Type: migrator.DB_Blob, Nullable: false},
{Name: "body", Type: migrator.DB_LongBlob, Nullable: false},
{Name: "size", Type: migrator.DB_BigInt, Nullable: false},
{Name: "etag", Type: migrator.DB_NVarchar, Length: 32, Nullable: false}, // md5(body)
{Name: "etag", Type: migrator.DB_NVarchar, Length: 32, Nullable: false, IsLatin: true}, // md5(body)
// Who changed what when
{Name: "updated", Type: migrator.DB_DateTime, Nullable: false},
{Name: "updated_at", Type: migrator.DB_BigInt, Nullable: false},
{Name: "updated_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
// Commit message
{Name: "message", Type: migrator.DB_Text, Nullable: false}, // defaults to empty string
},
Indices: []*migrator.Index{
{Cols: []string{"key", "version"}, Type: migrator.UniqueIndex},
{Cols: []string{"path", "version"}, Type: migrator.UniqueIndex},
{Cols: []string{"updated_by"}, Type: migrator.IndexType},
},
})
@ -124,19 +134,16 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
// Migration cleanups: given that this is a complex setup
// that requires a lot of testing before we are ready to push out of dev
// this script lets us easy wipe previous changes and initialize clean tables
suffix := " (v0)" // change this when we want to wipe and reset the object tables
suffix := " (v2)" // change this when we want to wipe and reset the object tables
mg.AddMigration("ObjectStore init: cleanup"+suffix, migrator.NewRawSQLMigration(strings.TrimSpace(`
DELETE FROM migration_log WHERE migration_id LIKE 'ObjectStore init%';
DROP table if exists "object";
DROP table if exists "object_ref";
DROP table if exists "object_history";
DROP table if exists "object_labels";
DROP table if exists "object_alias";
DROP table if exists "object_access";
`)))
// Initialize all tables
for t := range tables {
mg.AddMigration("ObjectStore init: drop "+tables[t].Name+suffix, migrator.NewRawSQLMigration(
fmt.Sprintf("DROP TABLE IF EXISTS %s", tables[t].Name),
))
mg.AddMigration("ObjectStore init: table "+tables[t].Name+suffix, migrator.NewAddTableMigration(tables[t]))
for i := range tables[t].Indices {
mg.AddMigration(fmt.Sprintf("ObjectStore init: index %s[%d]"+suffix, tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i]))

@ -11,6 +11,7 @@ type Column struct {
Nullable bool
IsPrimaryKey bool
IsAutoIncrement bool
IsLatin bool
Default string
}

@ -91,7 +91,11 @@ func (db *MySQLDialect) SQLType(c *Column) string {
switch c.Type {
case DB_Char, DB_Varchar, DB_NVarchar, DB_TinyText, DB_Text, DB_MediumText, DB_LongText:
res += " CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
if c.IsLatin {
res += " CHARACTER SET latin1 COLLATE latin1_bin"
} else {
res += " CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
}
}
return res

@ -483,6 +483,7 @@ var featuresEnabledDuringTests = []string{
featuremgmt.FlagDashboardPreviews,
featuremgmt.FlagDashboardComments,
featuremgmt.FlagPanelTitleSearch,
featuremgmt.FlagObjectStore,
}
// InitTestDBWithMigration initializes the test DB given custom migrations.

@ -0,0 +1,78 @@
package sqlstash
import "strings"
type selectQuery struct {
fields []string // SELECT xyz
from string // FROM object
limit int
oneExtra bool
where []string
args []interface{}
}
func (q *selectQuery) addWhere(f string, val string) {
q.args = append(q.args, val)
q.where = append(q.where, f+"=?")
}
func (q *selectQuery) addWhereIn(f string, vals []string) {
count := len(vals)
if count > 1 {
sb := strings.Builder{}
sb.WriteString(f)
sb.WriteString(" IN (")
for i := 0; i < count; i++ {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString("?")
q.args = append(q.args, vals[i])
}
sb.WriteString(") ")
q.where = append(q.where, sb.String())
} else if count == 1 {
q.addWhere(f, vals[0])
}
}
func (q *selectQuery) addWherePrefix(f string, v string) {
q.args = append(q.args, v+"%")
q.where = append(q.where, f+" LIKE ?")
}
func (q *selectQuery) toQuery() (string, []interface{}) {
args := q.args
sb := strings.Builder{}
sb.WriteString("SELECT ")
sb.WriteString(strings.Join(q.fields, ","))
sb.WriteString(" FROM ")
sb.WriteString(q.from)
// Templated where string
where := len(q.where)
if where > 0 {
sb.WriteString(" WHERE ")
for i := 0; i < where; i++ {
if i > 0 {
sb.WriteString(" AND ")
}
sb.WriteString(q.where[i])
}
}
if q.limit > 0 || q.oneExtra {
limit := q.limit
if limit < 1 {
limit = 20
q.limit = limit
}
if q.oneExtra {
limit = limit + 1
}
sb.WriteString(" LIMIT ?")
args = append(args, limit)
}
return sb.String(), args
}

@ -0,0 +1,731 @@
package sqlstash
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/kind"
"github.com/grafana/grafana/pkg/services/store/kind/folder"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/services/store/resolver"
"github.com/grafana/grafana/pkg/services/store/router"
"github.com/grafana/grafana/pkg/setting"
)
func ProvideSQLObjectServer(db db.DB, cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry, resolver resolver.ObjectReferenceResolver) object.ObjectStoreServer {
objectServer := &sqlObjectServer{
sess: db.GetSqlxSession(),
log: log.New("sql-object-server"),
kinds: kinds,
resolver: resolver,
router: router.NewObjectStoreRouter(kinds),
}
object.RegisterObjectStoreServer(grpcServerProvider.GetServer(), objectServer)
return objectServer
}
type sqlObjectServer struct {
log log.Logger
sess *session.SessionDB
kinds kind.KindRegistry
resolver resolver.ObjectReferenceResolver
router router.ObjectStoreRouter
}
func getReadSelect(r *object.ReadObjectRequest) string {
fields := []string{
"path", "kind", "version",
"size", "etag", "errors", // errors are always returned
"created_at", "created_by",
"updated_at", "updated_by",
"sync_src", "sync_time"}
if r.WithBody {
fields = append(fields, `body`)
}
if r.WithSummary {
fields = append(fields, `name`, `description`, `labels`, `fields`)
}
return "SELECT " + strings.Join(fields, ",") + " FROM object WHERE "
}
func (s *sqlObjectServer) rowToReadObjectResponse(ctx context.Context, rows *sql.Rows, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
path := "" // string (extract UID?)
var syncSrc sql.NullString
var syncTime sql.NullTime
raw := &object.RawObject{
GRN: &object.GRN{},
}
summaryjson := &summarySupport{}
args := []interface{}{
&path, &raw.GRN.Kind, &raw.Version,
&raw.Size, &raw.ETag, &summaryjson.errors,
&raw.Created, &raw.CreatedBy,
&raw.Updated, &raw.UpdatedBy,
&syncSrc, &syncTime,
}
if r.WithBody {
args = append(args, &raw.Body)
}
if r.WithSummary {
args = append(args, &summaryjson.name, &summaryjson.description, &summaryjson.labels, &summaryjson.fields)
}
err := rows.Scan(args...)
if err != nil {
return nil, err
}
if syncSrc.Valid || syncTime.Valid {
raw.Sync = &object.RawObjectSyncInfo{
Source: syncSrc.String,
Time: syncTime.Time.UnixMilli(),
}
}
// Get the GRN from key. TODO? save each part as a column?
info, _ := s.router.RouteFromKey(ctx, path)
if info.GRN != nil {
raw.GRN = info.GRN
}
rsp := &object.ReadObjectResponse{
Object: raw,
}
if r.WithSummary || summaryjson.errors != nil {
summary, err := summaryjson.toObjectSummary()
if err != nil {
return nil, err
}
js, err := json.Marshal(summary)
if err != nil {
return nil, err
}
rsp.SummaryJson = js
}
return rsp, nil
}
func (s *sqlObjectServer) getObjectKey(ctx context.Context, grn *object.GRN) (router.ResourceRouteInfo, error) {
if grn == nil {
return router.ResourceRouteInfo{}, fmt.Errorf("missing grn")
}
user := store.UserFromContext(ctx)
if user == nil {
return router.ResourceRouteInfo{}, fmt.Errorf("can not find user in context")
}
if user.OrgID != grn.TenantId {
if grn.TenantId > 0 {
return router.ResourceRouteInfo{}, fmt.Errorf("invalid user (wrong tenant id)")
}
grn.TenantId = user.OrgID
}
return s.router.Route(ctx, grn)
}
func (s *sqlObjectServer) Read(ctx context.Context, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
if r.Version != "" {
return s.readFromHistory(ctx, r)
}
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
args := []interface{}{route.Key}
where := "path=?"
rows, err := s.sess.Query(ctx, getReadSelect(r)+where, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
if !rows.Next() {
return &object.ReadObjectResponse{}, nil
}
return s.rowToReadObjectResponse(ctx, rows, r)
}
func (s *sqlObjectServer) readFromHistory(ctx context.Context, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
fields := []string{
"body", "size", "etag",
"updated_at", "updated_by",
}
rows, err := s.sess.Query(ctx,
"SELECT "+strings.Join(fields, ",")+
" FROM object_history WHERE path=? AND version=?", route.Key, r.Version)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
// Version or key not found
if !rows.Next() {
return &object.ReadObjectResponse{}, nil
}
raw := &object.RawObject{
GRN: r.GRN,
}
rsp := &object.ReadObjectResponse{
Object: raw,
}
err = rows.Scan(&raw.Body, &raw.Size, &raw.ETag, &raw.Updated, &raw.UpdatedBy)
if err != nil {
return nil, err
}
// For versioned files, the created+updated are the same
raw.Created = raw.Updated
raw.CreatedBy = raw.UpdatedBy
raw.Version = r.Version // from the query
// Dynamically create the summary
if r.WithSummary {
builder := s.kinds.GetSummaryBuilder(r.GRN.Kind)
if builder != nil {
val, out, err := builder(ctx, r.GRN.UID, raw.Body)
if err == nil {
raw.Body = out // cleaned up
rsp.SummaryJson, err = json.Marshal(val)
if err != nil {
return nil, err
}
}
}
}
// Clear the body if not requested
if !r.WithBody {
rsp.Object.Body = nil
}
return rsp, err
}
func (s *sqlObjectServer) BatchRead(ctx context.Context, b *object.BatchReadObjectRequest) (*object.BatchReadObjectResponse, error) {
if len(b.Batch) < 1 {
return nil, fmt.Errorf("missing querires")
}
first := b.Batch[0]
args := []interface{}{}
constraints := []string{}
for _, r := range b.Batch {
if r.WithBody != first.WithBody || r.WithSummary != first.WithSummary {
return nil, fmt.Errorf("requests must want the same things")
}
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
where := "path=?"
args = append(args, route.Key)
if r.Version != "" {
return nil, fmt.Errorf("version not supported for batch read (yet?)")
}
constraints = append(constraints, where)
}
req := b.Batch[0]
query := getReadSelect(req) + strings.Join(constraints, " OR ")
rows, err := s.sess.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
// TODO? make sure the results are in order?
rsp := &object.BatchReadObjectResponse{}
for rows.Next() {
r, err := s.rowToReadObjectResponse(ctx, rows, req)
if err != nil {
return nil, err
}
rsp.Results = append(rsp.Results, r)
}
return rsp, nil
}
func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
grn := route.GRN
if grn == nil {
return nil, fmt.Errorf("invalid grn")
}
modifier := store.UserFromContext(ctx)
if modifier == nil {
return nil, fmt.Errorf("can not find user in context")
}
summary, body, err := s.prepare(ctx, r)
if err != nil {
return nil, err
}
etag := createContentsHash(body)
path := route.Key
rsp := &object.WriteObjectResponse{
GRN: grn,
Status: object.WriteObjectResponse_CREATED, // Will be changed if not true
}
// Make sure all parent folders exist
if grn.Scope == models.ObjectStoreScopeDrive {
err = s.ensureFolders(ctx, grn)
if err != nil {
return nil, err
}
}
err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error {
isUpdate := false
versionInfo, err := s.selectForUpdate(ctx, tx, path)
if err != nil {
return err
}
// Same object
if versionInfo.ETag == etag {
rsp.Object = versionInfo
rsp.Status = object.WriteObjectResponse_UNCHANGED
return nil
}
// Optimistic locking
if r.PreviousVersion != "" {
if r.PreviousVersion != versionInfo.Version {
return fmt.Errorf("optimistic lock failed")
}
}
// Set the comment on this write
timestamp := time.Now().UnixMilli()
versionInfo.Comment = r.Comment
if versionInfo.Version == "" {
versionInfo.Version = "1"
} else {
// Increment the version
i, _ := strconv.ParseInt(versionInfo.Version, 0, 64)
if i < 1 {
i = timestamp
}
versionInfo.Version = fmt.Sprintf("%d", i+1)
isUpdate = true
}
if isUpdate {
// Clear the labels+refs
if _, err := tx.Exec(ctx, "DELETE FROM object_labels WHERE path=?", path); err != nil {
return err
}
if _, err := tx.Exec(ctx, "DELETE FROM object_ref WHERE path=?", path); err != nil {
return err
}
}
// 1. Add the `object_history` values
versionInfo.Size = int64(len(body))
versionInfo.ETag = etag
versionInfo.Updated = timestamp
versionInfo.UpdatedBy = store.GetUserIDString(modifier)
_, err = tx.Exec(ctx, `INSERT INTO object_history (`+
"path, version, message, "+
"size, body, etag, "+
"updated_at, updated_by) "+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
path, versionInfo.Version, versionInfo.Comment,
versionInfo.Size, body, versionInfo.ETag,
timestamp, versionInfo.UpdatedBy,
)
if err != nil {
return err
}
// 2. Add the labels rows
for k, v := range summary.model.Labels {
_, err = tx.Exec(ctx,
`INSERT INTO object_labels `+
"(path, label, value) "+
`VALUES (?, ?, ?)`,
path, k, v,
)
if err != nil {
return err
}
}
// 3. Add the references rows
for _, ref := range summary.model.References {
resolved, err := s.resolver.Resolve(ctx, ref)
if err != nil {
return err
}
_, err = tx.Exec(ctx, `INSERT INTO object_ref (`+
"path, kind, type, uid, "+
"resolved_ok, resolved_to, resolved_warning, resolved_time) "+
`VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
path, ref.Kind, ref.Type, ref.UID,
resolved.OK, resolved.Key, resolved.Warning, resolved.Timestamp,
)
if err != nil {
return err
}
}
// 5. Add/update the main `object` table
rsp.Object = versionInfo
if isUpdate {
rsp.Status = object.WriteObjectResponse_UPDATED
_, err = tx.Exec(ctx, "UPDATE object SET "+
"body=?, size=?, etag=?, version=?, "+
"updated_at=?, updated_by=?,"+
"name=?, description=?,"+
"labels=?, fields=?, errors=? "+
"WHERE path=?",
body, versionInfo.Size, etag, versionInfo.Version,
timestamp, versionInfo.UpdatedBy,
summary.model.Name, summary.model.Description,
summary.labels, summary.fields, summary.errors,
path,
)
return err
}
// Insert the new row
_, err = tx.Exec(ctx, "INSERT INTO object ("+
"path, parent_folder_path, kind, size, body, etag, version,"+
"updated_at, updated_by, created_at, created_by,"+
"name, description,"+
"labels, fields, errors) "+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
path, getParentFolderPath(grn.Kind, path), grn.Kind, versionInfo.Size, body, etag, versionInfo.Version,
timestamp, versionInfo.UpdatedBy, timestamp, versionInfo.UpdatedBy, // created + updated are the same
summary.model.Name, summary.model.Description,
summary.labels, summary.fields, summary.errors,
)
return err
})
rsp.SummaryJson = summary.marshaled
if err != nil {
rsp.Status = object.WriteObjectResponse_ERROR
}
return rsp, err
}
func (s *sqlObjectServer) selectForUpdate(ctx context.Context, tx *session.SessionTx, path string) (*object.ObjectVersionInfo, error) {
q := "SELECT etag,version,updated_at,size FROM object WHERE path=?"
if false { // TODO, MYSQL/PosgreSQL can lock the row " FOR UPDATE"
q += " FOR UPDATE"
}
rows, err := tx.Query(ctx, q, path)
if err != nil {
return nil, err
}
current := &object.ObjectVersionInfo{}
if rows.Next() {
err = rows.Scan(&current.ETag, &current.Version, &current.Updated, &current.Size)
}
if err == nil {
err = rows.Close()
}
return current, err
}
func (s *sqlObjectServer) prepare(ctx context.Context, r *object.WriteObjectRequest) (*summarySupport, []byte, error) {
grn := r.GRN
builder := s.kinds.GetSummaryBuilder(grn.Kind)
if builder == nil {
return nil, nil, fmt.Errorf("unsupported kind")
}
summary, body, err := builder(ctx, grn.UID, r.Body)
if err != nil {
return nil, nil, err
}
summaryjson, err := newSummarySupport(summary)
if err != nil {
return nil, nil, err
}
return summaryjson, body, nil
}
func (s *sqlObjectServer) Delete(ctx context.Context, r *object.DeleteObjectRequest) (*object.DeleteObjectResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
path := route.Key
rsp := &object.DeleteObjectResponse{}
err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error {
results, err := tx.Exec(ctx, "DELETE FROM object WHERE path=?", path)
if err != nil {
return err
}
rows, err := results.RowsAffected()
if err != nil {
return err
}
if rows > 0 {
rsp.OK = true
}
// TODO: keep history? would need current version bump, and the "write" would have to get from history
_, _ = tx.Exec(ctx, "DELETE FROM object_history WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_labels WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_ref WHERE path=?", path)
return nil
})
return rsp, err
}
func (s *sqlObjectServer) History(ctx context.Context, r *object.ObjectHistoryRequest) (*object.ObjectHistoryResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN)
if err != nil {
return nil, err
}
path := route.Key
page := ""
args := []interface{}{path}
if r.NextPageToken != "" {
// args = append(args, r.NextPageToken) // TODO, need to get time from the version
// page = "AND updated <= ?"
return nil, fmt.Errorf("next page not supported yet")
}
query := "SELECT version,size,etag,updated_at,updated_by,message \n" +
" FROM object_history \n" +
" WHERE path=? " + page + "\n" +
" ORDER BY updated_at DESC LIMIT 100"
rows, err := s.sess.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
rsp := &object.ObjectHistoryResponse{
GRN: route.GRN,
}
for rows.Next() {
v := &object.ObjectVersionInfo{}
err := rows.Scan(&v.Version, &v.Size, &v.ETag, &v.Updated, &v.UpdatedBy, &v.Comment)
if err != nil {
return nil, err
}
rsp.Versions = append(rsp.Versions, v)
}
return rsp, err
}
func (s *sqlObjectServer) Search(ctx context.Context, r *object.ObjectSearchRequest) (*object.ObjectSearchResponse, error) {
user := store.UserFromContext(ctx)
if r.NextPageToken != "" || len(r.Sort) > 0 || len(r.Labels) > 0 {
return nil, fmt.Errorf("not yet supported")
}
fields := []string{
"path", "kind", "version", "errors", // errors are always returned
"updated_at", "updated_by",
"name", "description", // basic summary
}
if r.WithBody {
fields = append(fields, "body")
}
if r.WithLabels {
fields = append(fields, "labels")
}
if r.WithFields {
fields = append(fields, "fields")
}
selectQuery := selectQuery{
fields: fields,
from: "object", // the table
args: []interface{}{},
limit: int(r.Limit),
oneExtra: true, // request one more than the limit (and show next token if it exists)
}
if len(r.Kind) > 0 {
selectQuery.addWhereIn("kind", r.Kind)
}
// Locked to a folder or prefix
if r.Folder != "" {
if strings.HasSuffix(r.Folder, "/") {
return nil, fmt.Errorf("folder should not end with slash")
}
if strings.HasSuffix(r.Folder, "*") {
keyPrefix := fmt.Sprintf("%d/%s", user.OrgID, strings.ReplaceAll(r.Folder, "*", ""))
selectQuery.addWherePrefix("path", keyPrefix)
} else {
keyPrefix := fmt.Sprintf("%d/%s", user.OrgID, r.Folder)
selectQuery.addWhere("parent_folder_path", keyPrefix)
}
} else {
keyPrefix := fmt.Sprintf("%d/", user.OrgID)
selectQuery.addWherePrefix("path", keyPrefix)
}
query, args := selectQuery.toQuery()
fmt.Printf("\n\n-------------\n")
fmt.Printf("%s\n", query)
fmt.Printf("%v\n", args)
fmt.Printf("\n-------------\n\n")
rows, err := s.sess.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
key := ""
rsp := &object.ObjectSearchResponse{}
for rows.Next() {
result := &object.ObjectSearchResult{
GRN: &object.GRN{},
}
summaryjson := summarySupport{}
args := []interface{}{
&key, &result.GRN.Kind, &result.Version, &summaryjson.errors,
&result.Updated, &result.UpdatedBy,
&result.Name, &summaryjson.description,
}
if r.WithBody {
args = append(args, &result.Body)
}
if r.WithLabels {
args = append(args, &summaryjson.labels)
}
if r.WithFields {
args = append(args, &summaryjson.fields)
}
err = rows.Scan(args...)
if err != nil {
return rsp, err
}
info, err := s.router.RouteFromKey(ctx, key)
if err != nil {
return rsp, err
}
result.GRN = info.GRN
// found one more than requested
if len(rsp.Results) >= selectQuery.limit {
// TODO? should this encode start+offset?
rsp.NextPageToken = key
break
}
if summaryjson.description != nil {
result.Description = *summaryjson.description
}
if summaryjson.labels != nil {
b := []byte(*summaryjson.labels)
err = json.Unmarshal(b, &result.Labels)
if err != nil {
return rsp, err
}
}
if summaryjson.fields != nil {
result.FieldsJson = []byte(*summaryjson.fields)
}
if summaryjson.errors != nil {
result.ErrorJson = []byte(*summaryjson.errors)
}
rsp.Results = append(rsp.Results, result)
}
return rsp, err
}
func (s *sqlObjectServer) ensureFolders(ctx context.Context, objectgrn *object.GRN) error {
uid := objectgrn.UID
idx := strings.LastIndex(uid, "/")
var missing []*object.GRN
for idx > 0 {
parent := uid[:idx]
grn := &object.GRN{
TenantId: objectgrn.TenantId,
Scope: objectgrn.Scope,
Kind: models.StandardKindFolder,
UID: parent,
}
fr, err := s.router.Route(ctx, grn)
if err != nil {
return err
}
// Not super efficient, but maybe it is OK?
results := []int64{}
err = s.sess.Select(ctx, &results, "SELECT 1 from object WHERE path=?", fr.Key)
if err != nil {
return err
}
if len(results) == 0 {
missing = append([]*object.GRN{grn}, missing...)
}
idx = strings.LastIndex(parent, "/")
}
// walk though each missing element
for _, grn := range missing {
f := &folder.Model{
Name: store.GuessNameFromUID(grn.UID),
}
fmt.Printf("CREATE Folder: %s\n", grn.UID)
body, err := json.Marshal(f)
if err != nil {
return err
}
_, err = s.Write(ctx, &object.WriteObjectRequest{
GRN: grn,
Body: body,
})
if err != nil {
return err
}
}
return nil
}

@ -0,0 +1,96 @@
package sqlstash
import (
"encoding/json"
"github.com/grafana/grafana/pkg/models"
)
type summarySupport struct {
model *models.ObjectSummary
name string
description *string // null or empty
labels *string
fields *string
errors *string // should not allow saving with this!
marshaled []byte
}
func newSummarySupport(summary *models.ObjectSummary) (*summarySupport, error) {
var err error
var js []byte
s := &summarySupport{
model: summary,
}
if summary != nil {
s.marshaled, err = json.Marshal(summary)
if err != nil {
return s, err
}
s.name = summary.Name
if summary.Description != "" {
s.description = &summary.Description
}
if len(summary.Labels) > 0 {
js, err = json.Marshal(summary.Labels)
if err != nil {
return s, err
}
str := string(js)
s.labels = &str
}
if len(summary.Fields) > 0 {
js, err = json.Marshal(summary.Fields)
if err != nil {
return s, err
}
str := string(js)
s.fields = &str
}
if summary.Error != nil {
js, err = json.Marshal(summary.Error)
if err != nil {
return s, err
}
str := string(js)
s.errors = &str
}
}
return s, err
}
func (s summarySupport) toObjectSummary() (*models.ObjectSummary, error) {
var err error
summary := &models.ObjectSummary{
Name: s.name,
}
if s.description != nil {
summary.Description = *s.description
}
if s.labels != nil {
b := []byte(*s.labels)
err = json.Unmarshal(b, &summary.Labels)
if err != nil {
return summary, err
}
}
if s.fields != nil {
b := []byte(*s.fields)
err = json.Unmarshal(b, &summary.Fields)
if err != nil {
return summary, err
}
}
if s.errors != nil {
b := []byte(*s.errors)
err = json.Unmarshal(b, &summary.Error)
if err != nil {
return summary, err
}
}
return summary, err
}

@ -0,0 +1,27 @@
package sqlstash
import (
"crypto/md5"
"encoding/hex"
"strings"
"github.com/grafana/grafana/pkg/models"
)
func createContentsHash(contents []byte) string {
hash := md5.Sum(contents)
return hex.EncodeToString(hash[:])
}
func getParentFolderPath(kind string, key string) string {
idx := strings.LastIndex(key, "/")
if idx < 0 {
return "" // ?
}
// folder should have a parent up one directory
if kind == models.StandardKindFolder {
idx = strings.LastIndex(key[:idx], "/")
}
return key[:idx]
}
Loading…
Cancel
Save