The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/store/entity/sqlstash/queries.go

611 lines
16 KiB

package sqlstash
import (
"context"
"database/sql"
"embed"
"encoding/json"
"errors"
"fmt"
"strings"
"text/template"
"time"
"google.golang.org/protobuf/proto"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
)
// Templates setup.
var (
//go:embed data/*.sql
sqlTemplatesFS embed.FS
// all templates
helpers = template.FuncMap{
"listSep": helperListSep,
"join": helperJoin,
}
sqlTemplates = template.Must(template.New("sql").Funcs(helpers).ParseFS(sqlTemplatesFS, `data/*.sql`))
)
func mustTemplate(filename string) *template.Template {
if t := sqlTemplates.Lookup(filename); t != nil {
return t
}
panic(fmt.Sprintf("template file not found: %s", filename))
}
// Templates.
var (
sqlEntityDelete = mustTemplate("entity_delete.sql")
sqlEntityInsert = mustTemplate("entity_insert.sql")
sqlEntityListFolderElements = mustTemplate("entity_list_folder_elements.sql")
sqlEntityRead = mustTemplate("entity_read.sql")
sqlEntityUpdate = mustTemplate("entity_update.sql")
sqlEntityFolderInsert = mustTemplate("entity_folder_insert.sql")
sqlEntityLabelsDelete = mustTemplate("entity_labels_delete.sql")
sqlEntityLabelsInsert = mustTemplate("entity_labels_insert.sql")
sqlKindVersionGet = mustTemplate("kind_version_get.sql")
sqlKindVersionInc = mustTemplate("kind_version_inc.sql")
sqlKindVersionInsert = mustTemplate("kind_version_insert.sql")
sqlKindVersionLock = mustTemplate("kind_version_lock.sql")
)
// TxOptions.
var (
ReadCommitted = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
}
ReadCommittedRO = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: true,
}
)
// SQLError is an error returned by the database, which includes additionally
// debugging information about what was sent to the database.
type SQLError struct {
Err error
CallType string // either Query, QueryRow or Exec
TemplateName string
Query string
RawQuery string
ScanDest []any
// potentially regulated information is not exported and only directly
// available for local testing and local debugging purposes, making sure it
// is never marshaled to JSON or any other serialization.
arguments []any
}
func (e SQLError) Unwrap() error {
return e.Err
}
func (e SQLError) Error() string {
return fmt.Sprintf("%s: %s with %d input arguments and %d output "+
"destination arguments: %v", e.TemplateName, e.CallType,
len(e.arguments), len(e.ScanDest), e.Err)
}
// entity_folder table requests.
type sqlEntityFolderInsertRequest struct {
*sqltemplate.SQLTemplate
Items []*sqlEntityFolderInsertRequestItem
}
func (r sqlEntityFolderInsertRequest) Validate() error {
return nil // TODO
}
type sqlEntityFolderInsertRequestItem struct {
GUID string
Namespace string
UID string
SlugPath string
JS string
Depth int32
Left int32
Right int32
Detached bool
}
// entity_labels table requests.
type sqlEntityLabelsInsertRequest struct {
*sqltemplate.SQLTemplate
GUID string
Labels map[string]string
}
func (r sqlEntityLabelsInsertRequest) Validate() error {
return nil // TODO
}
type sqlEntityLabelsDeleteRequest struct {
*sqltemplate.SQLTemplate
GUID string
KeepLabels []string
}
func (r sqlEntityLabelsDeleteRequest) Validate() error {
return nil // TODO
}
// entity_kind table requests.
type returnsKindVersion struct {
ResourceVersion int64
CreatedAt int64
UpdatedAt int64
}
func (r *returnsKindVersion) Results() (*returnsKindVersion, error) {
return r, nil
}
type sqlKindVersionGetRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
*returnsKindVersion
}
func (r sqlKindVersionGetRequest) Validate() error {
return nil // TODO
}
type sqlKindVersionLockRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
*returnsKindVersion
}
func (r sqlKindVersionLockRequest) Validate() error {
return nil // TODO
}
type sqlKindVersionIncRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
ResourceVersion int64
UpdatedAt int64
}
func (r sqlKindVersionIncRequest) Validate() error {
return nil // TODO
}
type sqlKindVersionInsertRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
CreatedAt int64
UpdatedAt int64
}
func (r sqlKindVersionInsertRequest) Validate() error {
return nil // TODO
}
// entity and entity_history tables requests.
type sqlEntityListFolderElementsRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
Namespace string
FolderInfo *folderInfo
}
func (r sqlEntityListFolderElementsRequest) Validate() error {
return nil // TODO
}
// sqlEntityReadRequest can be used to retrieve a row from either the "entity"
// or the "entity_history" tables. In particular, don't use this template
// directly. Instead, use the readEntity function, which provides all common use
// cases and proper database deserialization.
type sqlEntityReadRequest struct {
*sqltemplate.SQLTemplate
Key *entity.Key
ResourceVersion int64
SelectForUpdate bool
returnsEntitySet
}
func (r sqlEntityReadRequest) Validate() error {
return nil // TODO
}
type sqlEntityDeleteRequest struct {
*sqltemplate.SQLTemplate
Key *entity.Key
}
func (r sqlEntityDeleteRequest) Validate() error {
return nil // TODO
}
type sqlEntityInsertRequest struct {
*sqltemplate.SQLTemplate
Entity *returnsEntity
// TableEntity, when true, means we will insert into table "entity", and
// into table "entity_history" otherwise.
TableEntity bool
}
func (r sqlEntityInsertRequest) Validate() error {
return nil // TODO
}
type sqlEntityUpdateRequest struct {
*sqltemplate.SQLTemplate
Entity *returnsEntity
}
func (r sqlEntityUpdateRequest) Validate() error {
return nil // TODO
}
// newEmptyEntity allocates a new entity.Entity and all its internal state to be
// ready for use.
func newEmptyEntity() *entity.Entity {
return &entity.Entity{
// we need to allocate all internal pointer types so that they
// are readily available to be populated in the template
Origin: new(entity.EntityOriginInfo),
// we also set default empty values in slices and maps instead of nil to
// provide the most consistent JSON representation fields that will be
// serialized this way to the database.
Labels: map[string]string{},
Fields: map[string]string{},
Errors: []*entity.EntityErrorInfo{},
}
}
func cloneEntity(src *entity.Entity) *entity.Entity {
ret := newEmptyEntity()
proto.Merge(ret, src)
return ret
}
// returnsEntitySet can be embedded in a request struct to provide automatic set
// returning of []*entity.Entity from the database, deserializing as needed. It
// should be embedded as a value type.
// Example struct:
//
// type sqlMyRequest struct {
// *sqltemplate.SQLTemplate
// returnsEntitySet // embedded value type, not pointer type
// GUID string // example argument
// MaxResourceVersion int // example argument
// }
//
// Example struct usage::
//
// req := sqlMyRequest{
// SQLTemplate: sqltemplate.New(myDialect),
// returnsEntitySet: newReturnsEntitySet(),
// GUID: "abc",
// MaxResourceVersion: 1,
// }
// entities, err := query(myTx, myTmpl, req)
//
// Example usage in SQL template:
//
// SELECT
// {{ .Ident "guid" | .Into .Entity.Guid }},
// {{ .Ident "resource_version" | .Into .Entity.ResourceVersion }},
// {{ .Ident "body" | .Into .Entity.Body }}
// FROM {{ .Ident "entity_history" }}
// WHERE 1 = 1
// AND {{ .Ident "guid" }} = {{ .Arg .GUID }}
// AND {{ .Ident "resource_version" }} <= {{ .Arg .MaxResourceVersion }}
// ;
type returnsEntitySet struct {
Entity *returnsEntity
}
// newWithResults returns a new newWithResults.
func newReturnsEntitySet() returnsEntitySet {
return returnsEntitySet{
Entity: newReturnsEntity(),
}
}
// Results is part of the implementation of sqltemplate.WithResults that
// deserializes the database data into an internal *entity.Entity, and then
// returns a deep copy of it.
func (e returnsEntitySet) Results() (*entity.Entity, error) {
ent, err := e.Entity.Results()
if err != nil {
return nil, err
}
return cloneEntity(ent), nil
}
// returnsEntity is a wrapper that aids with database (de)serialization. It
// embeds a *entity.Entity to provide transparent access to all its fields, but
// overrides the ones that need database (de)serialization. It should be a named
// field in your request struct, with pointer type.
// Example struct:
//
// type sqlMyRequest struct {
// *sqltemplate.SQLTemplate
// Entity *returnsEntity // named field with pointer type
// GUID string // example argument
// ResourceVersion int // example argument
// }
//
// Example struct usage:
//
// req := sqlMyRequest{
// SQLTemplate: sqltemplate.New(myDialect),
// Entity: newReturnsEntity(),
// GUID: "abc",
// ResourceVersion: 1,
// }
// err := queryRow(myTx, myTmpl, req)
// // check err here
// err = req.Entity.unmarshal()
// // check err, and you can now use req.Entity.Entity
//
// Example usage in SQL template:
//
// SELECT
// {{ .Ident "guid" | .Into .Entity.Guid }},
// {{ .Ident "resource_version" | .Into .Entity.ResourceVersion }},
// {{ .Ident "body" | .Into .Entity.Body }}
// FROM {{ .Ident "entity" }}
// WHERE 1 =1
// AND {{ .Ident "guid" }} = {{ .Arg .GUID }}
// AND {{ .Ident "resource_version" }} = {{ .Arg .ResourceVersion }}
// ;
type returnsEntity struct {
*entity.Entity
Labels []byte
Fields []byte
Errors []byte
}
func newReturnsEntity() *returnsEntity {
return &returnsEntity{
Entity: newEmptyEntity(),
}
}
func (e *returnsEntity) Results() (*entity.Entity, error) {
if err := e.unmarshal(); err != nil {
return nil, err
}
return e.Entity, nil
}
// marshal serializes the fields from the wire protocol representation so they
// can be written to the database.
func (e *returnsEntity) marshal() error {
var err error
if len(e.Entity.Labels) == 0 {
e.Labels = []byte{'{', '}'}
} else {
e.Labels, err = json.Marshal(e.Entity.Labels)
if err != nil {
return fmt.Errorf("serialize entity \"labels\" field: %w", err)
}
}
if len(e.Entity.Fields) == 0 {
e.Fields = []byte{'{', '}'}
} else {
e.Fields, err = json.Marshal(e.Entity.Fields)
if err != nil {
return fmt.Errorf("serialize entity \"fields\" field: %w", err)
}
}
if len(e.Entity.Errors) == 0 {
e.Errors = []byte{'[', ']'}
} else {
e.Errors, err = json.Marshal(e.Entity.Errors)
if err != nil {
return fmt.Errorf("serialize entity \"errors\" field: %w", err)
}
}
return nil
}
// unmarshal deserializes the fields in the database representation so they can
// be written to the wire protocol.
func (e *returnsEntity) unmarshal() error {
if len(e.Labels) > 0 {
if err := json.Unmarshal(e.Labels, &e.Entity.Labels); err != nil {
return fmt.Errorf("deserialize entity \"labels\" field: %w", err)
}
} else {
e.Entity.Labels = map[string]string{}
}
if len(e.Fields) > 0 {
if err := json.Unmarshal(e.Fields, &e.Entity.Fields); err != nil {
return fmt.Errorf("deserialize entity \"fields\" field: %w", err)
}
} else {
e.Entity.Fields = map[string]string{}
}
if len(e.Errors) > 0 {
if err := json.Unmarshal(e.Errors, &e.Entity.Errors); err != nil {
return fmt.Errorf("deserialize entity \"errors\" field: %w", err)
}
} else {
e.Entity.Errors = []*entity.EntityErrorInfo{}
}
return nil
}
// readEntity returns the entity defined by the given key as it existed at
// version `asOfVersion`, if that value is greater than zero. The returned
// entity will have at most that version. If `asOfVersion` is zero, then the
// current version of that entity will be returned. If `optimisticLocking` is
// true, then the latest version of the entity will be retrieved and return an
// error if its version is not exactly `asOfVersion`. The option
// `selectForUpdate` will cause to acquire a row-level exclusive lock upon
// selecting it. `optimisticLocking` is ignored if `asOfVersion` is zero.
// Common errors to check:
// 1. ErrOptimisticLockingFailed: the latest version of the entity does not
// match the value of `asOfVersion`.
// 2. ErrNotFound: the entity does not currently exist, did not exist at the
// version of `asOfVersion` or was deleted.
func readEntity(
ctx context.Context,
x db.ContextExecer,
d sqltemplate.Dialect,
k *entity.Key,
asOfVersion int64,
optimisticLocking bool,
selectForUpdate bool,
) (*returnsEntity, error) {
asOfVersion = max(asOfVersion, 0)
optimisticLocking = optimisticLocking && asOfVersion != 0
v := asOfVersion
if optimisticLocking {
// for optimistic locking, we will not ask for a specific version, but
// instead retrieve the latest version from the table "entity" and
// manually compare if it matches the given value of "asOfVersion".
v = 0
}
readReq := sqlEntityReadRequest{
SQLTemplate: sqltemplate.New(d),
Key: k,
ResourceVersion: v,
SelectForUpdate: selectForUpdate,
returnsEntitySet: newReturnsEntitySet(),
}
ent, err := queryRow(ctx, x, sqlEntityRead, readReq)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("read entity: %w", err)
}
if ent.Action == entity.Entity_DELETED {
return nil, ErrNotFound
}
if optimisticLocking && asOfVersion != 0 && ent.ResourceVersion != asOfVersion {
return nil, ErrOptimisticLockingFailed
}
return readReq.Entity, nil
}
// kindVersionAtomicInc atomically increases the version of a kind within a
// transaction.
func kindVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (newVersion int64, err error) {
now := time.Now().UnixMilli()
// 1. Lock the kind and get the latest version
lockReq := sqlKindVersionLockRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
returnsKindVersion: new(returnsKindVersion),
}
kindv, err := queryRow(ctx, x, sqlKindVersionLock, lockReq)
// if there wasn't a row associated with the given kind, we create one with
// version 1
if errors.Is(err, sql.ErrNoRows) {
// NOTE: there is a marginal chance that we race with another writer
// trying to create the same row. This is only possible when onboarding
// a new (Group, Resource) to the cell, which should be very unlikely,
// and the workaround is simply retrying. The alternative would be to
// use INSERT ... ON CONFLICT DO UPDATE ..., but that creates a
// requirement for support in Dialect only for this marginal case, and
// we would rather keep Dialect as small as possible. Another
// alternative is to simply check if the INSERT returns a DUPLICATE KEY
// error and then retry the original SELECT, but that also adds some
// complexity to the code. That would be preferrable to changing
// Dialect, though. The current alternative, just retrying, seems to be
// enough for now.
insReq := sqlKindVersionInsertRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
CreatedAt: now,
UpdatedAt: now,
}
if _, err = exec(ctx, x, sqlKindVersionInsert, insReq); err != nil {
return 0, fmt.Errorf("insert into kind_version: %w", err)
}
return 1, nil
}
if err != nil {
return 0, fmt.Errorf("lock kind: %w", err)
}
incReq := sqlKindVersionIncRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
ResourceVersion: kindv.ResourceVersion,
UpdatedAt: now,
}
if _, err = exec(ctx, x, sqlKindVersionInc, incReq); err != nil {
return 0, fmt.Errorf("increase kind version: %w", err)
}
return kindv.ResourceVersion + 1, nil
}
// Template helpers.
// helperListSep is a helper that helps writing simpler loops in SQL templates.
// Example usage:
//
// {{ $comma := listSep ", " }}
// {{ range .Values }}
// {{/* here we put "-" on each end to remove extra white space */}}
// {{- call $comma -}}
// {{ .Value }}
// {{ end }}
func helperListSep(sep string) func() string {
var addSep bool
return func() string {
if addSep {
return sep
}
addSep = true
return ""
}
}
func helperJoin(sep string, elems ...string) string {
return strings.Join(elems, sep)
}