The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/util/xorm/dialect_spanner.go

399 lines
12 KiB

//go:build enterprise || pro
package xorm
import (
"database/sql"
"fmt"
"strconv"
"strings"
spannerclient "cloud.google.com/go/spanner"
_ "github.com/googleapis/go-sql-spanner"
spannerdriver "github.com/googleapis/go-sql-spanner"
"github.com/grafana/grafana/pkg/util/xorm/core"
"google.golang.org/grpc/codes"
)
func init() {
core.RegisterDriver("spanner", &spannerDriver{})
core.RegisterDialect("spanner", func() core.Dialect { return &spanner{} })
}
// https://cloud.google.com/spanner/docs/reference/standard-sql/lexical#reserved_keywords
var spannerReservedKeywords = map[string]struct{}{
"ALL": {},
"AND": {},
"ANY": {},
"ARRAY": {},
"AS": {},
"ASC": {},
"ASSERT_ROWS_MODIFIED": {},
"AT": {},
"BETWEEN": {},
"BY": {},
"CASE": {},
"CAST": {},
"COLLATE": {},
"CONTAINS": {},
"CREATE": {},
"CROSS": {},
"CUBE": {},
"CURRENT": {},
"DEFAULT": {},
"DEFINE": {},
"DESC": {},
"DISTINCT": {},
"ELSE": {},
"END": {},
"ENUM": {},
"ESCAPE": {},
"EXCEPT": {},
"EXCLUDE": {},
"EXISTS": {},
"EXTRACT": {},
"FALSE": {},
"FETCH": {},
"FOLLOWING": {},
"FOR": {},
"FROM": {},
"FULL": {},
"GROUP": {},
"GROUPING": {},
"GROUPS": {},
"HASH": {},
"HAVING": {},
"IF": {},
"IGNORE": {},
"IN": {},
"INNER": {},
"INTERSECT": {},
"INTERVAL": {},
"INTO": {},
"IS": {},
"JOIN": {},
"LATERAL": {},
"LEFT": {},
"LIKE": {},
"LIMIT": {},
"LOOKUP": {},
"MERGE": {},
"NATURAL": {},
"NEW": {},
"NO": {},
"NOT": {},
"NULL": {},
"NULLS": {},
"OF": {},
"ON": {},
"OR": {},
"ORDER": {},
"OUTER": {},
"OVER": {},
"PARTITION": {},
"PRECEDING": {},
"PROTO": {},
"RANGE": {},
"RECURSIVE": {},
"RESPECT": {},
"RIGHT": {},
"ROLLUP": {},
"ROWS": {},
"SELECT": {},
"SET": {},
"SOME": {},
"STRUCT": {},
"TABLESAMPLE": {},
"THEN": {},
"TO": {},
"TREAT": {},
"TRUE": {},
"UNBOUNDED": {},
"UNION": {},
"UNNEST": {},
"USING": {},
"WHEN": {},
"WHERE": {},
"WINDOW": {},
"WITH": {},
"WITHIN": {},
}
type spannerDriver struct{}
func (d *spannerDriver) Parse(_driverName, datasourceName string) (*core.Uri, error) {
return &core.Uri{DbType: "spanner", DbName: datasourceName}, nil
}
type spanner struct {
core.Base
}
func (s *spanner) Init(db *core.DB, uri *core.Uri, driverName string, datasourceName string) error {
return s.Base.Init(db, s, uri, driverName, datasourceName)
}
func (s *spanner) Filters() []core.Filter { return []core.Filter{&core.IdFilter{}} }
func (s *spanner) IsReserved(name string) bool {
_, exists := spannerReservedKeywords[name]
return exists
}
func (s *spanner) AndStr() string { return "AND" }
func (s *spanner) OrStr() string { return "OR" }
func (s *spanner) EqStr() string { return "=" }
func (s *spanner) RollBackStr() string { return "ROLL BACK" }
func (s *spanner) AutoIncrStr() string {
// Spanner does not support auto-increment, but supports unique generated IDs (not sequential!).
return "GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE)"
}
func (s *spanner) SupportInsertMany() bool { return false } // Needs manual transaction batching
func (s *spanner) SupportEngine() bool { return false } // No support for engine selection
func (s *spanner) SupportCharset() bool { return false } // ...or charsets
func (s *spanner) SupportDropIfExists() bool { return false } // Drop should be handled differently
func (s *spanner) IndexOnTable() bool { return false }
func (s *spanner) ShowCreateNull() bool { return false }
func (s *spanner) Quote(name string) string { return "`" + name + "`" }
func (s *spanner) SqlType(col *core.Column) string {
switch col.SQLType.Name {
case core.Int, core.SmallInt, core.BigInt:
return "INT64"
case core.Varchar, core.Text, core.MediumText, core.LongText, core.Char, core.NVarchar, core.NChar, core.NText:
l := col.Length
if l == 0 {
l = col.SQLType.DefaultLength
}
if l > 0 {
return fmt.Sprintf("STRING(%d)", l)
}
return "STRING(MAX)"
case core.Jsonb:
return "STRING(MAX)"
case core.Bool, core.TinyInt:
return "BOOL"
case core.Float, core.Double:
return "FLOAT64"
case core.Bytea, core.Blob, core.MediumBlob, core.LongBlob:
l := col.Length
if l == 0 {
l = col.SQLType.DefaultLength
}
if l > 0 {
return fmt.Sprintf("BYTES(%d)", l)
}
return "BYTES(MAX)"
case core.DateTime, core.TimeStamp:
return "TIMESTAMP"
default:
panic("unknown column type: " + col.SQLType.Name)
//default:
// return "STRING(MAX)" // XXX: more types to add
}
}
func (s *spanner) GetColumns(tableName string) ([]string, map[string]*core.Column, error) {
query := `SELECT COLUMN_NAME, SPANNER_TYPE, IS_NULLABLE, IS_IDENTITY, IDENTITY_GENERATION, IDENTITY_KIND, COLUMN_DEFAULT
FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = ? AND TABLE_SCHEMA="" ORDER BY ORDINAL_POSITION`
rows, err := s.DB().Query(query, tableName)
if err != nil {
return nil, nil, err
}
defer rows.Close()
columns := make(map[string]*core.Column)
var colNames []string
var name, sqlType, isNullable string
var isIdentity, identityGeneration, identityKind, columnDefault sql.NullString
for rows.Next() {
if err := rows.Scan(&name, &sqlType, &isNullable, &isIdentity, &identityGeneration, &identityKind, &columnDefault); err != nil {
return nil, nil, err
}
var length int
switch {
case sqlType == "INT64":
sqlType = core.Int
case sqlType == "FLOAT32" || sqlType == "FLOAT64":
sqlType = core.Float
case sqlType == "BOOL":
sqlType = core.Bool
case sqlType == "BYTES(MAX)":
sqlType = core.Blob
case sqlType == "STRING(MAX)":
sqlType = core.NVarchar
case sqlType == "TIMESTAMP":
sqlType = core.DateTime
case strings.HasPrefix(sqlType, "BYTES("):
// 6 == len(`BYTES(`), we also remove ")" from the end.
if l, err := strconv.Atoi(sqlType[6 : len(sqlType)-1]); err == nil {
length = l
}
sqlType = core.Blob
case strings.HasPrefix(sqlType, "STRING("):
// 7 == len(`STRING(`), we also remove ")" from the end.
if l, err := strconv.Atoi(sqlType[7 : len(sqlType)-1]); err == nil {
length = l
}
sqlType = core.Varchar
default:
panic("unknown column type: " + sqlType)
}
autoincrement := isIdentity.Valid && isIdentity.String == "YES" &&
identityGeneration.Valid && identityGeneration.String == "BY DEFAULT" &&
identityKind.Valid && identityKind.String == "BIT_REVERSED_POSITIVE_SEQUENCE"
defValue := ""
defEmpty := true
if columnDefault.Valid {
defValue = columnDefault.String
defEmpty = false
}
col := &core.Column{
Name: name,
SQLType: core.SQLType{Name: sqlType},
Length: length,
Nullable: isNullable == "YES",
IsAutoIncrement: autoincrement,
Indexes: map[string]int{},
Default: defValue,
DefaultIsEmpty: defEmpty,
}
columns[name] = col
colNames = append(colNames, name)
}
return colNames, columns, rows.Err()
}
func (s *spanner) CreateTableSql(table *core.Table, tableName, _, charset string) string {
sql := "CREATE TABLE " + s.Quote(tableName) + " ("
for i, col := range table.Columns() {
if i > 0 {
sql += ", "
}
sql += s.Quote(col.Name) + " " + s.SqlType(col)
if !col.Nullable {
sql += " NOT NULL"
}
if col.Default != "" {
sql += " DEFAULT (" + col.Default + ")"
}
if col.IsAutoIncrement {
sql += " GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE)"
}
}
sql += ") PRIMARY KEY (" + strings.Join(table.PrimaryKeys, ",") + ")"
return sql
}
func (s *spanner) CreateIndexSql(tableName string, index *core.Index) string {
sql := "CREATE "
if index.Type == core.UniqueType {
sql += "UNIQUE NULL_FILTERED "
}
sql += "INDEX " + s.Quote(index.XName(tableName)) + " ON " + s.Quote(tableName) + " (" + strings.Join(index.Cols, ", ") + ")"
return sql
}
func (s *spanner) IndexCheckSql(tableName, indexName string) (string, []any) {
return `SELECT index_name FROM information_schema.indexes
WHERE table_name = ? AND table_schema = "" AND index_name = ?`,
[]any{tableName, indexName}
}
func (s *spanner) TableCheckSql(tableName string) (string, []any) {
return `SELECT table_name FROM information_schema.tables
WHERE table_name = ? AND table_schema = ""`,
[]any{tableName}
}
func (s *spanner) GetTables() ([]*core.Table, error) {
res, err := s.DB().Query(`SELECT table_name FROM information_schema.tables WHERE table_schema = ""`)
if err != nil {
return nil, err
}
defer res.Close()
tables := []*core.Table{}
for res.Next() {
var name string
if err := res.Scan(&name); err != nil {
return nil, err
}
t := core.NewEmptyTable()
t.Name = name
tables = append(tables, t)
}
return tables, res.Err()
}
func (s *spanner) GetIndexes(tableName string) (map[string]*core.Index, error) {
res, err := s.DB().Query(`SELECT ix.INDEX_NAME, ix.INDEX_TYPE, ix.IS_UNIQUE, c.COLUMN_NAME
FROM INFORMATION_SCHEMA.INDEXES ix
JOIN INFORMATION_SCHEMA.INDEX_COLUMNS c ON (ix.TABLE_NAME=c.TABLE_NAME AND ix.INDEX_NAME=c.INDEX_NAME)
WHERE ix.TABLE_SCHEMA = "" AND ix.TABLE_NAME=?
ORDER BY ix.INDEX_NAME, c.ORDINAL_POSITION`, tableName)
if err != nil {
return nil, err
}
defer res.Close()
indexes := map[string]*core.Index{}
var ixName, ixType, colName string
var isUnique bool
for res.Next() {
err := res.Scan(&ixName, &ixType, &isUnique, &colName)
if err != nil {
return nil, err
}
isRegular := false
if strings.HasPrefix(ixName, "IDX_"+tableName) || strings.HasPrefix(ixName, "UQE_"+tableName) {
ixName = ixName[5+len(tableName):]
isRegular = true
}
var index *core.Index
var ok bool
if index, ok = indexes[ixName]; !ok {
t := core.IndexType // ixType == "INDEX" && !isUnique
if ixType == "PRIMARY KEY" || isUnique {
t = core.UniqueType
}
index = &core.Index{}
index.IsRegular = isRegular
index.Type = t
index.Name = ixName
indexes[ixName] = index
}
index.AddColumn(colName)
}
return indexes, res.Err()
}
func (s *spanner) CreateSequenceGenerator(db *sql.DB) (SequenceGenerator, error) {
dsn := s.DataSourceName()
connectorConfig, err := spannerdriver.ExtractConnectorConfig(dsn)
if err != nil {
return nil, err
}
if connectorConfig.Params[strings.ToLower("inMemSequenceGenerator")] == "true" {
// Switch to using in-memory sequence number generator.
// Using database-based sequence generator doesn't work with emulator, as emulator
// only supports single transaction. If there is already another transaction started
// generating new ID via database-based sequence generator would always fail.
return newInMemSequenceGenerator(), nil
}
return newSequenceGenerator(db), nil
}
func (s *spanner) RetryOnError(err error) bool {
return err != nil && spannerclient.ErrCode(spannerclient.ToSpannerError(err)) == codes.Aborted
}