mirror of https://github.com/grafana/grafana
[release-11.2.8] Chore: Bump dependencies to address security issues (#101648)
* Chore: Bump github.com/rs/cors to v1.11.1
* Chore: Bump github.com/golang/glog to v1.2.4
* Chore: Bump filippo.io/age to v1.2.1
* Chore: Bump github.com/ua-parser/uap-go to v0.0.0-20250213224047-9c035f085b90
* Chore: Bump github.com/go-jose/go-jose/v3 to v3.0.4
* Chore: Bump golang.org/x/net to v0.36.0
* Chore: Bump golang.org/x/oauth2 to v0.28.0
* Chore: Bump github.com/moby/moby to v27.5.1
* Chore: Bump github.com/elazarl/goproxy to v1.7.1
* Zanzana: bump openfga version (#94485)
* Bump openfga
* Remove internall sqlite implementation for openfga
* Use sqlite implementation from openfga
(cherry picked from commit 9ece88d585
)
* Chore: Bump github.com/openfga/openfga to v1.8.5
---------
Co-authored-by: Karl Persson <kalle.persson@grafana.com>
pull/101795/head
parent
21f1d02b3f
commit
7b7b37359f
File diff suppressed because it is too large
Load Diff
@ -1,10 +0,0 @@ |
||||
package assets |
||||
|
||||
import "embed" |
||||
|
||||
// EmbedMigrations within the grafana binary.
|
||||
//
|
||||
//go:embed migrations/*
|
||||
var EmbedMigrations embed.FS |
||||
|
||||
const SQLiteMigrationDir = "migrations/sqlite" |
@ -1,56 +0,0 @@ |
||||
-- +goose Up |
||||
CREATE TABLE tuple ( |
||||
store CHAR(26) NOT NULL, |
||||
object_type VARCHAR(128) NOT NULL, |
||||
object_id VARCHAR(128) NOT NULL, |
||||
relation VARCHAR(50) NOT NULL, |
||||
_user VARCHAR(256) NOT NULL, |
||||
user_type VARCHAR(7) NOT NULL, |
||||
ulid CHAR(26) NOT NULL, |
||||
inserted_at TIMESTAMP NOT NULL, |
||||
PRIMARY KEY (store, object_type, object_id, relation, _user) |
||||
); |
||||
|
||||
CREATE UNIQUE INDEX idx_tuple_ulid ON tuple (ulid); |
||||
|
||||
CREATE TABLE authorization_model ( |
||||
store CHAR(26) NOT NULL, |
||||
authorization_model_id CHAR(26) NOT NULL, |
||||
type VARCHAR(256) NOT NULL, |
||||
type_definition BLOB, |
||||
PRIMARY KEY (store, authorization_model_id, type) |
||||
); |
||||
|
||||
CREATE TABLE store ( |
||||
id CHAR(26) PRIMARY KEY, |
||||
name VARCHAR(64) NOT NULL, |
||||
created_at TIMESTAMP NOT NULL, |
||||
updated_at TIMESTAMP, |
||||
deleted_at TIMESTAMP |
||||
); |
||||
|
||||
CREATE TABLE assertion ( |
||||
store CHAR(26) NOT NULL, |
||||
authorization_model_id CHAR(26) NOT NULL, |
||||
assertions BLOB, |
||||
PRIMARY KEY (store, authorization_model_id) |
||||
); |
||||
|
||||
CREATE TABLE changelog ( |
||||
store CHAR(26) NOT NULL, |
||||
object_type VARCHAR(256) NOT NULL, |
||||
object_id VARCHAR(256) NOT NULL, |
||||
relation VARCHAR(50) NOT NULL, |
||||
_user VARCHAR(512) NOT NULL, |
||||
operation INTEGER NOT NULL, |
||||
ulid CHAR(26) NOT NULL, |
||||
inserted_at TIMESTAMP NOT NULL, |
||||
PRIMARY KEY (store, ulid, object_type) |
||||
); |
||||
|
||||
-- +goose Down |
||||
DROP TABLE tuple; |
||||
DROP TABLE authorization_model; |
||||
DROP TABLE store; |
||||
DROP TABLE assertion; |
||||
DROP TABLE changelog; |
@ -1,5 +0,0 @@ |
||||
-- +goose Up |
||||
ALTER TABLE authorization_model ADD COLUMN schema_version VARCHAR(5) NOT NULL DEFAULT '1.0'; |
||||
|
||||
-- +goose Down |
||||
ALTER TABLE authorization_model DROP COLUMN schema_version; |
@ -1,5 +0,0 @@ |
||||
-- +goose Up |
||||
CREATE INDEX idx_reverse_lookup_user on tuple (store, object_type, relation, _user); |
||||
|
||||
-- +goose Down |
||||
DROP INDEX idx_reverse_lookup_user on tuple; |
@ -1,5 +0,0 @@ |
||||
-- +goose Up |
||||
ALTER TABLE authorization_model ADD COLUMN serialized_protobuf LONGBLOB; |
||||
|
||||
-- +goose Down |
||||
ALTER TABLE authorization_model DROP COLUMN serialized_protobuf; |
@ -1,11 +0,0 @@ |
||||
-- +goose Up |
||||
ALTER TABLE tuple ADD COLUMN condition_name VARCHAR(256); |
||||
ALTER TABLE tuple ADD COLUMN condition_context LONGBLOB; |
||||
ALTER TABLE changelog ADD COLUMN condition_name VARCHAR(256); |
||||
ALTER TABLE changelog ADD COLUMN condition_context LONGBLOB; |
||||
|
||||
-- +goose Down |
||||
ALTER TABLE tuple DROP COLUMN condition_name; |
||||
ALTER TABLE tuple DROP COLUMN condition_context; |
||||
ALTER TABLE changelog DROP COLUMN condition_name; |
||||
ALTER TABLE changelog DROP COLUMN condition_context; |
@ -1,15 +0,0 @@ |
||||
package sqlite |
||||
|
||||
import "github.com/openfga/openfga/pkg/storage/sqlcommon" |
||||
|
||||
type Config struct { |
||||
*sqlcommon.Config |
||||
QueryRetries int |
||||
} |
||||
|
||||
func NewConfig() *Config { |
||||
return &Config{ |
||||
Config: sqlcommon.NewConfig(), |
||||
QueryRetries: 0, |
||||
} |
||||
} |
@ -1,821 +0,0 @@ |
||||
package sqlite |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"net/url" |
||||
"strings" |
||||
"time" |
||||
|
||||
sq "github.com/Masterminds/squirrel" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/collectors" |
||||
"go.opentelemetry.io/otel" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/structpb" |
||||
"google.golang.org/protobuf/types/known/timestamppb" |
||||
|
||||
// Pull in sqlite driver.
|
||||
"github.com/mattn/go-sqlite3" |
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1" |
||||
"github.com/openfga/openfga/pkg/logger" |
||||
"github.com/openfga/openfga/pkg/storage" |
||||
"github.com/openfga/openfga/pkg/storage/sqlcommon" |
||||
tupleUtils "github.com/openfga/openfga/pkg/tuple" |
||||
) |
||||
|
||||
var tracer = otel.Tracer("openfga/pkg/storage/sqlite") |
||||
|
||||
// SQLite provides a SQLite based implementation of [storage.OpenFGADatastore].
|
||||
type SQLite struct { |
||||
stbl sq.StatementBuilderType |
||||
cfg *Config |
||||
db *sql.DB |
||||
dbInfo *sqlcommon.DBInfo |
||||
sqlTime sq.Sqlizer |
||||
logger logger.Logger |
||||
dbStatsCollector prometheus.Collector |
||||
} |
||||
|
||||
// Ensures that SQLite implements the OpenFGADatastore interface.
|
||||
var _ storage.OpenFGADatastore = (*SQLite)(nil) |
||||
|
||||
// New creates a new [SQLite] storage.
|
||||
func New(uri string, cfg *Config) (*SQLite, error) { |
||||
// Set journal mode and busy timeout pragmas if not specified.
|
||||
query := url.Values{} |
||||
var err error |
||||
|
||||
if i := strings.Index(uri, "?"); i != -1 { |
||||
query, err = url.ParseQuery(uri[i+1:]) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error parsing dsn: %w", err) |
||||
} |
||||
|
||||
uri = uri[:i] |
||||
} |
||||
|
||||
foundJournalMode := false |
||||
foundBusyTimeout := false |
||||
for _, val := range query["_pragma"] { |
||||
if strings.HasPrefix(val, "journal_mode") { |
||||
foundJournalMode = true |
||||
} else if strings.HasPrefix(val, "busy_timeout") { |
||||
foundBusyTimeout = true |
||||
} |
||||
} |
||||
|
||||
if !foundJournalMode { |
||||
query.Add("_pragma", "journal_mode(WAL)") |
||||
} |
||||
if !foundBusyTimeout { |
||||
query.Add("_pragma", "busy_timeout(500)") |
||||
} |
||||
|
||||
uri += "?" + query.Encode() |
||||
|
||||
db, err := sql.Open("sqlite", uri) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("initialize sqlite connection: %w", err) |
||||
} |
||||
|
||||
return NewWithDB(db, cfg) |
||||
} |
||||
|
||||
// NewWithDB creates a new [SQLite] storage using provided [*sql.DB]
|
||||
func NewWithDB(db *sql.DB, cfg *Config) (*SQLite, error) { |
||||
var collector prometheus.Collector |
||||
if cfg.ExportMetrics { |
||||
collector = collectors.NewDBStatsCollector(db, "openfga") |
||||
if err := prometheus.Register(collector); err != nil { |
||||
return nil, fmt.Errorf("initialize metrics: %w", err) |
||||
} |
||||
} |
||||
|
||||
sqlTime := sq.Expr("datetime('subsec')") |
||||
stbl := sq.StatementBuilder.RunWith(db) |
||||
dbInfo := sqlcommon.NewDBInfo(db, stbl, sqlTime) |
||||
|
||||
return &SQLite{ |
||||
cfg: cfg, |
||||
stbl: stbl, |
||||
db: db, |
||||
sqlTime: sqlTime, |
||||
dbInfo: dbInfo, |
||||
logger: cfg.Logger, |
||||
dbStatsCollector: collector, |
||||
}, nil |
||||
} |
||||
|
||||
// Close see [storage.OpenFGADatastore].Close.
|
||||
func (m *SQLite) Close() { |
||||
if m.dbStatsCollector != nil { |
||||
prometheus.Unregister(m.dbStatsCollector) |
||||
} |
||||
_ = m.db.Close() |
||||
} |
||||
|
||||
// Read see [storage.RelationshipTupleReader].Read.
|
||||
func (m *SQLite) Read(ctx context.Context, store string, tupleKey *openfgav1.TupleKey) (storage.TupleIterator, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.Read") |
||||
defer span.End() |
||||
|
||||
return m.read(ctx, store, tupleKey, nil) |
||||
} |
||||
|
||||
// ReadPage see [storage.RelationshipTupleReader].ReadPage.
|
||||
func (m *SQLite) ReadPage( |
||||
ctx context.Context, |
||||
store string, |
||||
tupleKey *openfgav1.TupleKey, |
||||
opts storage.PaginationOptions, |
||||
) ([]*openfgav1.Tuple, []byte, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadPage") |
||||
defer span.End() |
||||
|
||||
iter, err := m.read(ctx, store, tupleKey, &opts) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
defer iter.Stop() |
||||
|
||||
return iter.ToArray(opts) |
||||
} |
||||
|
||||
func (m *SQLite) read(ctx context.Context, store string, tupleKey *openfgav1.TupleKey, opts *storage.PaginationOptions) (*sqlcommon.SQLTupleIterator, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.read") |
||||
defer span.End() |
||||
|
||||
sb := m.stbl. |
||||
Select( |
||||
"store", "object_type", "object_id", "relation", "_user", |
||||
"condition_name", "condition_context", "ulid", "inserted_at", |
||||
). |
||||
From("tuple"). |
||||
Where(sq.Eq{"store": store}) |
||||
if opts != nil { |
||||
sb = sb.OrderBy("ulid") |
||||
} |
||||
objectType, objectID := tupleUtils.SplitObject(tupleKey.GetObject()) |
||||
if objectType != "" { |
||||
sb = sb.Where(sq.Eq{"object_type": objectType}) |
||||
} |
||||
if objectID != "" { |
||||
sb = sb.Where(sq.Eq{"object_id": objectID}) |
||||
} |
||||
if tupleKey.GetRelation() != "" { |
||||
sb = sb.Where(sq.Eq{"relation": tupleKey.GetRelation()}) |
||||
} |
||||
if tupleKey.GetUser() != "" { |
||||
sb = sb.Where(sq.Eq{"_user": tupleKey.GetUser()}) |
||||
} |
||||
if opts != nil && opts.From != "" { |
||||
token, err := sqlcommon.UnmarshallContToken(opts.From) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
sb = sb.Where(sq.GtOrEq{"ulid": token.Ulid}) |
||||
} |
||||
if opts != nil && opts.PageSize != 0 { |
||||
sb = sb.Limit(uint64(opts.PageSize + 1)) // + 1 is used to determine whether to return a continuation token.
|
||||
} |
||||
|
||||
rows, err := sb.QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
return sqlcommon.NewSQLTupleIterator(rows), nil |
||||
} |
||||
|
||||
// Write see [storage.RelationshipTupleWriter].Write.
|
||||
func (m *SQLite) Write(ctx context.Context, store string, deletes storage.Deletes, writes storage.Writes) error { |
||||
ctx, span := tracer.Start(ctx, "sqlite.Write") |
||||
defer span.End() |
||||
|
||||
if len(deletes)+len(writes) > m.MaxTuplesPerWrite() { |
||||
return storage.ErrExceededWriteBatchLimit |
||||
} |
||||
|
||||
return m.busyRetry(func() error { |
||||
now := time.Now().UTC() |
||||
return write(ctx, m.db, m.stbl, m.sqlTime, store, deletes, writes, now) |
||||
}) |
||||
} |
||||
|
||||
// ReadUserTuple see [storage.RelationshipTupleReader].ReadUserTuple.
|
||||
func (m *SQLite) ReadUserTuple(ctx context.Context, store string, tupleKey *openfgav1.TupleKey) (*openfgav1.Tuple, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadUserTuple") |
||||
defer span.End() |
||||
|
||||
objectType, objectID := tupleUtils.SplitObject(tupleKey.GetObject()) |
||||
userType := tupleUtils.GetUserTypeFromUser(tupleKey.GetUser()) |
||||
|
||||
var conditionName sql.NullString |
||||
var conditionContext []byte |
||||
var record storage.TupleRecord |
||||
err := m.stbl. |
||||
Select( |
||||
"object_type", "object_id", "relation", "_user", |
||||
"condition_name", "condition_context", |
||||
). |
||||
From("tuple"). |
||||
Where(sq.Eq{ |
||||
"store": store, |
||||
"object_type": objectType, |
||||
"object_id": objectID, |
||||
"relation": tupleKey.GetRelation(), |
||||
"_user": tupleKey.GetUser(), |
||||
"user_type": userType, |
||||
}). |
||||
QueryRowContext(ctx). |
||||
Scan( |
||||
&record.ObjectType, |
||||
&record.ObjectID, |
||||
&record.Relation, |
||||
&record.User, |
||||
&conditionName, |
||||
&conditionContext, |
||||
) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
if conditionName.String != "" { |
||||
record.ConditionName = conditionName.String |
||||
|
||||
if conditionContext != nil { |
||||
var conditionContextStruct structpb.Struct |
||||
if err := proto.Unmarshal(conditionContext, &conditionContextStruct); err != nil { |
||||
return nil, err |
||||
} |
||||
record.ConditionContext = &conditionContextStruct |
||||
} |
||||
} |
||||
|
||||
return record.AsTuple(), nil |
||||
} |
||||
|
||||
// ReadUsersetTuples see [storage.RelationshipTupleReader].ReadUsersetTuples.
|
||||
func (m *SQLite) ReadUsersetTuples( |
||||
ctx context.Context, |
||||
store string, |
||||
filter storage.ReadUsersetTuplesFilter, |
||||
) (storage.TupleIterator, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadUsersetTuples") |
||||
defer span.End() |
||||
|
||||
sb := m.stbl. |
||||
Select( |
||||
"store", "object_type", "object_id", "relation", "_user", |
||||
"condition_name", "condition_context", "ulid", "inserted_at", |
||||
). |
||||
From("tuple"). |
||||
Where(sq.Eq{"store": store}). |
||||
Where(sq.Eq{"user_type": tupleUtils.UserSet}) |
||||
|
||||
objectType, objectID := tupleUtils.SplitObject(filter.Object) |
||||
if objectType != "" { |
||||
sb = sb.Where(sq.Eq{"object_type": objectType}) |
||||
} |
||||
if objectID != "" { |
||||
sb = sb.Where(sq.Eq{"object_id": objectID}) |
||||
} |
||||
if filter.Relation != "" { |
||||
sb = sb.Where(sq.Eq{"relation": filter.Relation}) |
||||
} |
||||
if len(filter.AllowedUserTypeRestrictions) > 0 { |
||||
orConditions := sq.Or{} |
||||
for _, userset := range filter.AllowedUserTypeRestrictions { |
||||
if _, ok := userset.GetRelationOrWildcard().(*openfgav1.RelationReference_Relation); ok { |
||||
orConditions = append(orConditions, sq.Like{"_user": userset.GetType() + ":%#" + userset.GetRelation()}) |
||||
} |
||||
if _, ok := userset.GetRelationOrWildcard().(*openfgav1.RelationReference_Wildcard); ok { |
||||
orConditions = append(orConditions, sq.Eq{"_user": userset.GetType() + ":*"}) |
||||
} |
||||
} |
||||
sb = sb.Where(orConditions) |
||||
} |
||||
rows, err := sb.QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
return sqlcommon.NewSQLTupleIterator(rows), nil |
||||
} |
||||
|
||||
// ReadStartingWithUser see [storage.RelationshipTupleReader].ReadStartingWithUser.
|
||||
func (m *SQLite) ReadStartingWithUser( |
||||
ctx context.Context, |
||||
store string, |
||||
opts storage.ReadStartingWithUserFilter, |
||||
) (storage.TupleIterator, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadStartingWithUser") |
||||
defer span.End() |
||||
|
||||
targetUsersArg := make([]string, 0, len(opts.UserFilter)) |
||||
for _, u := range opts.UserFilter { |
||||
targetUser := u.GetObject() |
||||
if u.GetRelation() != "" { |
||||
targetUser = strings.Join([]string{u.GetObject(), u.GetRelation()}, "#") |
||||
} |
||||
targetUsersArg = append(targetUsersArg, targetUser) |
||||
} |
||||
|
||||
rows, err := m.stbl. |
||||
Select( |
||||
"store", "object_type", "object_id", "relation", "_user", |
||||
"condition_name", "condition_context", "ulid", "inserted_at", |
||||
). |
||||
From("tuple"). |
||||
Where(sq.Eq{ |
||||
"store": store, |
||||
"object_type": opts.ObjectType, |
||||
"relation": opts.Relation, |
||||
"_user": targetUsersArg, |
||||
}).QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
return sqlcommon.NewSQLTupleIterator(rows), nil |
||||
} |
||||
|
||||
// MaxTuplesPerWrite see [storage.RelationshipTupleWriter].MaxTuplesPerWrite.
|
||||
func (m *SQLite) MaxTuplesPerWrite() int { |
||||
return m.cfg.MaxTuplesPerWriteField |
||||
} |
||||
|
||||
// ReadAuthorizationModel see [storage.AuthorizationModelReadBackend].ReadAuthorizationModel.
|
||||
func (m *SQLite) ReadAuthorizationModel(ctx context.Context, store string, modelID string) (*openfgav1.AuthorizationModel, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadAuthorizationModel") |
||||
defer span.End() |
||||
|
||||
return sqlcommon.ReadAuthorizationModel(ctx, m.dbInfo, store, modelID) |
||||
} |
||||
|
||||
// ReadAuthorizationModels see [storage.AuthorizationModelReadBackend].ReadAuthorizationModels.
|
||||
func (m *SQLite) ReadAuthorizationModels( |
||||
ctx context.Context, |
||||
store string, |
||||
opts storage.PaginationOptions, |
||||
) ([]*openfgav1.AuthorizationModel, []byte, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadAuthorizationModels") |
||||
defer span.End() |
||||
|
||||
sb := m.stbl.Select("authorization_model_id"). |
||||
Distinct(). |
||||
From("authorization_model"). |
||||
Where(sq.Eq{"store": store}). |
||||
OrderBy("authorization_model_id desc") |
||||
|
||||
if opts.From != "" { |
||||
token, err := sqlcommon.UnmarshallContToken(opts.From) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
sb = sb.Where(sq.LtOrEq{"authorization_model_id": token.Ulid}) |
||||
} |
||||
if opts.PageSize > 0 { |
||||
sb = sb.Limit(uint64(opts.PageSize + 1)) // + 1 is used to determine whether to return a continuation token.
|
||||
} |
||||
|
||||
rows, err := sb.QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
defer func() { _ = rows.Close() }() |
||||
|
||||
var modelIDs []string |
||||
var modelID string |
||||
|
||||
for rows.Next() { |
||||
err = rows.Scan(&modelID) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
|
||||
modelIDs = append(modelIDs, modelID) |
||||
} |
||||
|
||||
if err := rows.Err(); err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
|
||||
var token []byte |
||||
numModelIDs := len(modelIDs) |
||||
if len(modelIDs) > opts.PageSize { |
||||
numModelIDs = opts.PageSize |
||||
token, err = json.Marshal(sqlcommon.NewContToken(modelID, "")) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
} |
||||
|
||||
// TODO: make this concurrent with a maximum of 5 goroutines. This may be helpful:
|
||||
// https://stackoverflow.com/questions/25306073/always-have-x-number-of-goroutines-running-at-any-time
|
||||
models := make([]*openfgav1.AuthorizationModel, 0, numModelIDs) |
||||
// We use numModelIDs here to avoid retrieving possibly one extra model.
|
||||
for i := 0; i < numModelIDs; i++ { |
||||
model, err := m.ReadAuthorizationModel(ctx, store, modelIDs[i]) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
models = append(models, model) |
||||
} |
||||
|
||||
return models, token, nil |
||||
} |
||||
|
||||
// FindLatestAuthorizationModel see [storage.AuthorizationModelReadBackend].FindLatestAuthorizationModel.
|
||||
func (m *SQLite) FindLatestAuthorizationModel(ctx context.Context, store string) (*openfgav1.AuthorizationModel, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.FindLatestAuthorizationModel") |
||||
defer span.End() |
||||
|
||||
return sqlcommon.FindLatestAuthorizationModel(ctx, m.dbInfo, store) |
||||
} |
||||
|
||||
// MaxTypesPerAuthorizationModel see [storage.TypeDefinitionWriteBackend].MaxTypesPerAuthorizationModel.
|
||||
func (m *SQLite) MaxTypesPerAuthorizationModel() int { |
||||
return m.cfg.MaxTypesPerModelField |
||||
} |
||||
|
||||
// WriteAuthorizationModel see [storage.TypeDefinitionWriteBackend].WriteAuthorizationModel.
|
||||
func (m *SQLite) WriteAuthorizationModel(ctx context.Context, store string, model *openfgav1.AuthorizationModel) error { |
||||
ctx, span := tracer.Start(ctx, "sqlite.WriteAuthorizationModel") |
||||
defer span.End() |
||||
|
||||
typeDefinitions := model.GetTypeDefinitions() |
||||
|
||||
if len(typeDefinitions) > m.MaxTypesPerAuthorizationModel() { |
||||
return storage.ExceededMaxTypeDefinitionsLimitError(m.MaxTypesPerAuthorizationModel()) |
||||
} |
||||
|
||||
return m.busyRetry(func() error { |
||||
return sqlcommon.WriteAuthorizationModel(ctx, m.dbInfo, store, model) |
||||
}) |
||||
} |
||||
|
||||
// CreateStore adds a new store to the SQLite storage.
|
||||
func (m *SQLite) CreateStore(ctx context.Context, store *openfgav1.Store) (*openfgav1.Store, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.CreateStore") |
||||
defer span.End() |
||||
|
||||
txn, err := m.db.BeginTx(ctx, &sql.TxOptions{}) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
defer func() { |
||||
_ = txn.Rollback() |
||||
}() |
||||
|
||||
_, err = m.stbl. |
||||
Insert("store"). |
||||
Columns("id", "name", "created_at", "updated_at"). |
||||
Values(store.GetId(), store.GetName(), sq.Expr("datetime('subsec')"), sq.Expr("datetime('subsec')")). |
||||
RunWith(txn). |
||||
ExecContext(ctx) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
var createdAt time.Time |
||||
var id, name string |
||||
err = m.stbl. |
||||
Select("id", "name", "created_at"). |
||||
From("store"). |
||||
Where(sq.Eq{"id": store.GetId()}). |
||||
RunWith(txn). |
||||
QueryRowContext(ctx). |
||||
Scan(&id, &name, &createdAt) |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
err = txn.Commit() |
||||
if err != nil { |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
return &openfgav1.Store{ |
||||
Id: id, |
||||
Name: name, |
||||
CreatedAt: timestamppb.New(createdAt), |
||||
UpdatedAt: timestamppb.New(createdAt), |
||||
}, nil |
||||
} |
||||
|
||||
// GetStore retrieves the details of a specific store from the SQLite using its storeID.
|
||||
func (m *SQLite) GetStore(ctx context.Context, id string) (*openfgav1.Store, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.GetStore") |
||||
defer span.End() |
||||
|
||||
row := m.stbl. |
||||
Select("id", "name", "created_at", "updated_at"). |
||||
From("store"). |
||||
Where(sq.Eq{ |
||||
"id": id, |
||||
"deleted_at": nil, |
||||
}). |
||||
QueryRowContext(ctx) |
||||
|
||||
var storeID, name string |
||||
var createdAt, updatedAt time.Time |
||||
err := row.Scan(&storeID, &name, &createdAt, &updatedAt) |
||||
if err != nil { |
||||
if errors.Is(err, sql.ErrNoRows) { |
||||
return nil, storage.ErrNotFound |
||||
} |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
return &openfgav1.Store{ |
||||
Id: storeID, |
||||
Name: name, |
||||
CreatedAt: timestamppb.New(createdAt), |
||||
UpdatedAt: timestamppb.New(updatedAt), |
||||
}, nil |
||||
} |
||||
|
||||
// ListStores provides a paginated list of all stores present in the SQLite storage.
|
||||
func (m *SQLite) ListStores(ctx context.Context, opts storage.PaginationOptions) ([]*openfgav1.Store, []byte, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ListStores") |
||||
defer span.End() |
||||
|
||||
sb := m.stbl. |
||||
Select("id", "name", "created_at", "updated_at"). |
||||
From("store"). |
||||
Where(sq.Eq{"deleted_at": nil}). |
||||
OrderBy("id") |
||||
|
||||
if opts.From != "" { |
||||
token, err := sqlcommon.UnmarshallContToken(opts.From) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
sb = sb.Where(sq.GtOrEq{"id": token.Ulid}) |
||||
} |
||||
if opts.PageSize > 0 { |
||||
sb = sb.Limit(uint64(opts.PageSize + 1)) // + 1 is used to determine whether to return a continuation token.
|
||||
} |
||||
|
||||
rows, err := sb.QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
defer func() { _ = rows.Close() }() |
||||
|
||||
var stores []*openfgav1.Store |
||||
var id string |
||||
for rows.Next() { |
||||
var name string |
||||
var createdAt, updatedAt time.Time |
||||
err := rows.Scan(&id, &name, &createdAt, &updatedAt) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
|
||||
stores = append(stores, &openfgav1.Store{ |
||||
Id: id, |
||||
Name: name, |
||||
CreatedAt: timestamppb.New(createdAt), |
||||
UpdatedAt: timestamppb.New(updatedAt), |
||||
}) |
||||
} |
||||
|
||||
if err := rows.Err(); err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
|
||||
if len(stores) > opts.PageSize { |
||||
contToken, err := json.Marshal(sqlcommon.NewContToken(id, "")) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
return stores[:opts.PageSize], contToken, nil |
||||
} |
||||
|
||||
return stores, nil, nil |
||||
} |
||||
|
||||
// DeleteStore removes a store from the SQLite storage.
|
||||
func (m *SQLite) DeleteStore(ctx context.Context, id string) error { |
||||
ctx, span := tracer.Start(ctx, "sqlite.DeleteStore") |
||||
defer span.End() |
||||
|
||||
_, err := m.stbl. |
||||
Update("store"). |
||||
Set("deleted_at", sq.Expr("datetime('subsec')")). |
||||
Where(sq.Eq{"id": id}). |
||||
ExecContext(ctx) |
||||
if err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// WriteAssertions see [storage.AssertionsBackend].WriteAssertions.
|
||||
func (m *SQLite) WriteAssertions(ctx context.Context, store, modelID string, assertions []*openfgav1.Assertion) error { |
||||
ctx, span := tracer.Start(ctx, "sqlite.WriteAssertions") |
||||
defer span.End() |
||||
|
||||
marshalledAssertions, err := proto.Marshal(&openfgav1.Assertions{Assertions: assertions}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return m.busyRetry(func() error { |
||||
_, err = m.stbl. |
||||
Insert("assertion"). |
||||
Columns("store", "authorization_model_id", "assertions"). |
||||
Values(store, modelID, marshalledAssertions). |
||||
Suffix("ON CONFLICT(store,authorization_model_id) DO UPDATE SET assertions = ?", marshalledAssertions). |
||||
ExecContext(ctx) |
||||
if err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
// ReadAssertions see [storage.AssertionsBackend].ReadAssertions.
|
||||
func (m *SQLite) ReadAssertions(ctx context.Context, store, modelID string) ([]*openfgav1.Assertion, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadAssertions") |
||||
defer span.End() |
||||
|
||||
var marshalledAssertions []byte |
||||
err := m.stbl. |
||||
Select("assertions"). |
||||
From("assertion"). |
||||
Where(sq.Eq{ |
||||
"store": store, |
||||
"authorization_model_id": modelID, |
||||
}). |
||||
QueryRowContext(ctx). |
||||
Scan(&marshalledAssertions) |
||||
if err != nil { |
||||
if errors.Is(err, sql.ErrNoRows) { |
||||
return []*openfgav1.Assertion{}, nil |
||||
} |
||||
return nil, handleSQLError(err) |
||||
} |
||||
|
||||
var assertions openfgav1.Assertions |
||||
err = proto.Unmarshal(marshalledAssertions, &assertions) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return assertions.GetAssertions(), nil |
||||
} |
||||
|
||||
// ReadChanges see [storage.ChangelogBackend].ReadChanges.
|
||||
func (m *SQLite) ReadChanges( |
||||
ctx context.Context, |
||||
store, objectTypeFilter string, |
||||
opts storage.PaginationOptions, |
||||
horizonOffset time.Duration, |
||||
) ([]*openfgav1.TupleChange, []byte, error) { |
||||
ctx, span := tracer.Start(ctx, "sqlite.ReadChanges") |
||||
defer span.End() |
||||
|
||||
sb := m.stbl. |
||||
Select( |
||||
"ulid", "object_type", "object_id", "relation", "_user", "operation", |
||||
"condition_name", "condition_context", "inserted_at", |
||||
). |
||||
From("changelog"). |
||||
Where(sq.Eq{"store": store}). |
||||
Where(fmt.Sprintf("inserted_at <= datetime('subsec','-%f seconds')", horizonOffset.Seconds())). |
||||
OrderBy("ulid asc") |
||||
|
||||
if objectTypeFilter != "" { |
||||
sb = sb.Where(sq.Eq{"object_type": objectTypeFilter}) |
||||
} |
||||
if opts.From != "" { |
||||
token, err := sqlcommon.UnmarshallContToken(opts.From) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
if token.ObjectType != objectTypeFilter { |
||||
return nil, nil, storage.ErrMismatchObjectType |
||||
} |
||||
|
||||
sb = sb.Where(sq.Gt{"ulid": token.Ulid}) // > as we always return a continuation token.
|
||||
} |
||||
if opts.PageSize > 0 { |
||||
sb = sb.Limit(uint64(opts.PageSize)) // + 1 is NOT used here as we always return a continuation token.
|
||||
} |
||||
|
||||
rows, err := sb.QueryContext(ctx) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
defer func() { _ = rows.Close() }() |
||||
|
||||
var changes []*openfgav1.TupleChange |
||||
var ulid string |
||||
for rows.Next() { |
||||
var objectType, objectID, relation, user string |
||||
var operation int |
||||
var insertedAt time.Time |
||||
var conditionName sql.NullString |
||||
var conditionContext []byte |
||||
|
||||
err = rows.Scan( |
||||
&ulid, |
||||
&objectType, |
||||
&objectID, |
||||
&relation, |
||||
&user, |
||||
&operation, |
||||
&conditionName, |
||||
&conditionContext, |
||||
&insertedAt, |
||||
) |
||||
if err != nil { |
||||
return nil, nil, handleSQLError(err) |
||||
} |
||||
|
||||
var conditionContextStruct structpb.Struct |
||||
if conditionName.String != "" { |
||||
if conditionContext != nil { |
||||
if err := proto.Unmarshal(conditionContext, &conditionContextStruct); err != nil { |
||||
return nil, nil, err |
||||
} |
||||
} |
||||
} |
||||
|
||||
tk := tupleUtils.NewTupleKeyWithCondition( |
||||
tupleUtils.BuildObject(objectType, objectID), |
||||
relation, |
||||
user, |
||||
conditionName.String, |
||||
&conditionContextStruct, |
||||
) |
||||
|
||||
changes = append(changes, &openfgav1.TupleChange{ |
||||
TupleKey: tk, |
||||
Operation: openfgav1.TupleOperation(operation), |
||||
Timestamp: timestamppb.New(insertedAt.UTC()), |
||||
}) |
||||
} |
||||
|
||||
if len(changes) == 0 { |
||||
return nil, nil, storage.ErrNotFound |
||||
} |
||||
|
||||
contToken, err := json.Marshal(sqlcommon.NewContToken(ulid, objectTypeFilter)) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
return changes, contToken, nil |
||||
} |
||||
|
||||
func (m *SQLite) IsReady(ctx context.Context) (storage.ReadinessStatus, error) { |
||||
if err := m.db.PingContext(ctx); err != nil { |
||||
return storage.ReadinessStatus{}, err |
||||
} |
||||
return storage.ReadinessStatus{ |
||||
IsReady: true, |
||||
}, nil |
||||
} |
||||
|
||||
// SQLite will return an SQLITE_BUSY error when the database is locked rather than waiting for the lock.
|
||||
// This function retries the operation up to 5 times before returning the error.
|
||||
func (m *SQLite) busyRetry(fn func() error) error { |
||||
for retries := 0; ; retries++ { |
||||
err := fn() |
||||
if err == nil || retries == m.cfg.QueryRetries { |
||||
return err |
||||
} |
||||
|
||||
var sqliteErr *sqlite3.Error |
||||
if errors.As(err, &sqliteErr) && (sqliteErr.Code == sqlite3.ErrLocked || sqliteErr.Code == sqlite3.ErrBusy) { |
||||
time.Sleep(10 * time.Millisecond) |
||||
continue |
||||
} |
||||
|
||||
return err |
||||
} |
||||
} |
||||
|
||||
func handleSQLError(err error, args ...any) error { |
||||
if strings.Contains(err.Error(), "UNIQUE constraint failed:") { |
||||
if len(args) > 0 { |
||||
if tk, ok := args[0].(*openfgav1.TupleKey); ok { |
||||
return storage.InvalidWriteInputError(tk, openfgav1.TupleOperation_TUPLE_OPERATION_WRITE) |
||||
} |
||||
} |
||||
return storage.ErrCollision |
||||
} |
||||
|
||||
return sqlcommon.HandleSQLError(err, args...) |
||||
} |
@ -1,295 +0,0 @@ |
||||
package sqlite |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"testing" |
||||
"time" |
||||
|
||||
sq "github.com/Masterminds/squirrel" |
||||
"github.com/oklog/ulid/v2" |
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/protobuf/proto" |
||||
|
||||
"github.com/openfga/openfga/pkg/storage" |
||||
"github.com/openfga/openfga/pkg/storage/sqlcommon" |
||||
"github.com/openfga/openfga/pkg/storage/test" |
||||
"github.com/openfga/openfga/pkg/tuple" |
||||
"github.com/openfga/openfga/pkg/typesystem" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db" |
||||
"github.com/grafana/grafana/pkg/services/sqlstore/migrator" |
||||
"github.com/grafana/grafana/pkg/tests/testsuite" |
||||
|
||||
zassets "github.com/grafana/grafana/pkg/services/authz/zanzana/store/assets" |
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana/store/migration" |
||||
) |
||||
|
||||
func TestMain(m *testing.M) { |
||||
testsuite.Run(m) |
||||
} |
||||
|
||||
// TestIntegrationDatastore runs open fga default datastore test suite
|
||||
func TestIntegrationDatastore(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
test.RunAllTests(t, ds) |
||||
} |
||||
|
||||
// TestIntegrationReadEnsureNoOrder asserts that the read response is not ordered by ulid.
|
||||
func TestIntegrationReadEnsureNoOrder(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
|
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
store := "store" |
||||
firstTuple := tuple.NewTupleKey("doc:object_id_1", "relation", "user:user_1") |
||||
secondTuple := tuple.NewTupleKey("doc:object_id_2", "relation", "user:user_2") |
||||
|
||||
err = sqlcommon.Write(ctx, |
||||
sqlcommon.NewDBInfo(ds.db, ds.stbl, sq.Expr("datetime('subsec')")), |
||||
store, |
||||
[]*openfgav1.TupleKeyWithoutCondition{}, |
||||
[]*openfgav1.TupleKey{firstTuple}, |
||||
time.Now()) |
||||
require.NoError(t, err) |
||||
|
||||
// Tweak time so that ULID is smaller.
|
||||
err = sqlcommon.Write(ctx, |
||||
sqlcommon.NewDBInfo(ds.db, ds.stbl, sq.Expr("datetime('subsec')")), |
||||
store, |
||||
[]*openfgav1.TupleKeyWithoutCondition{}, |
||||
[]*openfgav1.TupleKey{secondTuple}, |
||||
time.Now().Add(time.Minute*-1)) |
||||
require.NoError(t, err) |
||||
|
||||
iter, err := ds.Read(ctx, |
||||
store, |
||||
tuple.NewTupleKey("doc:", "relation", "")) |
||||
defer iter.Stop() |
||||
|
||||
require.NoError(t, err) |
||||
|
||||
// We expect that objectID1 will return first because it is inserted first.
|
||||
curTuple, err := iter.Next(ctx) |
||||
require.NoError(t, err) |
||||
require.Equal(t, firstTuple, curTuple.GetKey()) |
||||
|
||||
curTuple, err = iter.Next(ctx) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secondTuple, curTuple.GetKey()) |
||||
} |
||||
|
||||
// TestIntegrationReadPageEnsureNoOrder asserts that the read page is ordered by ulid.
|
||||
func TestIntegrationReadPageEnsureOrder(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
|
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
|
||||
store := "store" |
||||
firstTuple := tuple.NewTupleKey("doc:object_id_1", "relation", "user:user_1") |
||||
secondTuple := tuple.NewTupleKey("doc:object_id_2", "relation", "user:user_2") |
||||
|
||||
err = sqlcommon.Write(ctx, |
||||
sqlcommon.NewDBInfo(ds.db, ds.stbl, sq.Expr("datetime('subsec')")), |
||||
store, |
||||
[]*openfgav1.TupleKeyWithoutCondition{}, |
||||
[]*openfgav1.TupleKey{firstTuple}, |
||||
time.Now()) |
||||
require.NoError(t, err) |
||||
|
||||
// Tweak time so that ULID is smaller.
|
||||
err = sqlcommon.Write(ctx, |
||||
sqlcommon.NewDBInfo(ds.db, ds.stbl, sq.Expr("datetime('subsec')")), |
||||
store, |
||||
[]*openfgav1.TupleKeyWithoutCondition{}, |
||||
[]*openfgav1.TupleKey{secondTuple}, |
||||
time.Now().Add(time.Minute*-1)) |
||||
require.NoError(t, err) |
||||
|
||||
tuples, _, err := ds.ReadPage(ctx, |
||||
store, |
||||
tuple.NewTupleKey("doc:", "relation", ""), |
||||
storage.NewPaginationOptions(0, "")) |
||||
require.NoError(t, err) |
||||
|
||||
require.Len(t, tuples, 2) |
||||
// We expect that objectID2 will return first because it has a smaller ulid.
|
||||
require.Equal(t, secondTuple, tuples[0].GetKey()) |
||||
require.Equal(t, firstTuple, tuples[1].GetKey()) |
||||
} |
||||
|
||||
func TestIntegrationReadAuthorizationModelUnmarshallError(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
|
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
store := "store" |
||||
modelID := "foo" |
||||
schemaVersion := typesystem.SchemaVersion1_0 |
||||
|
||||
bytes, err := proto.Marshal(&openfgav1.TypeDefinition{Type: "document"}) |
||||
require.NoError(t, err) |
||||
pbdata := []byte{0x01, 0x02, 0x03} |
||||
|
||||
_, err = ds.db.ExecContext(ctx, "INSERT INTO authorization_model (store, authorization_model_id, schema_version, type, type_definition, serialized_protobuf) VALUES (?, ?, ?, ?, ?, ?)", store, modelID, schemaVersion, "document", bytes, pbdata) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = ds.ReadAuthorizationModel(ctx, store, modelID) |
||||
require.Error(t, err) |
||||
require.Contains(t, err.Error(), "cannot parse invalid wire-format data") |
||||
} |
||||
|
||||
// TestIntegrationAllowNullCondition tests that tuple and changelog rows existing before
|
||||
// migration 005_add_conditions_to_tuples can be successfully read.
|
||||
func TestIntegrationAllowNullCondition(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
|
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
|
||||
stmt := ` |
||||
INSERT INTO tuple ( |
||||
store, object_type, object_id, relation, _user, user_type, ulid, |
||||
condition_name, condition_context, inserted_at |
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('subsec')); |
||||
` |
||||
_, err = ds.db.ExecContext( |
||||
ctx, stmt, "store", "folder", "2021-budget", "owner", "user:anne", "user", |
||||
ulid.Make().String(), nil, nil, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
tk := tuple.NewTupleKey("folder:2021-budget", "owner", "user:anne") |
||||
iter, err := ds.Read(ctx, "store", tk) |
||||
require.NoError(t, err) |
||||
defer iter.Stop() |
||||
|
||||
curTuple, err := iter.Next(ctx) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tk, curTuple.GetKey()) |
||||
|
||||
tuples, _, err := ds.ReadPage(ctx, "store", &openfgav1.TupleKey{}, storage.PaginationOptions{ |
||||
PageSize: 2, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.Len(t, tuples, 1) |
||||
require.Equal(t, tk, tuples[0].GetKey()) |
||||
|
||||
userTuple, err := ds.ReadUserTuple(ctx, "store", tk) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tk, userTuple.GetKey()) |
||||
|
||||
tk2 := tuple.NewTupleKey("folder:2022-budget", "viewer", "user:anne") |
||||
_, err = ds.db.ExecContext( |
||||
ctx, stmt, "store", "folder", "2022-budget", "viewer", "user:anne", "userset", |
||||
ulid.Make().String(), nil, nil, |
||||
) |
||||
|
||||
require.NoError(t, err) |
||||
iter, err = ds.ReadUsersetTuples(ctx, "store", storage.ReadUsersetTuplesFilter{Object: "folder:2022-budget"}) |
||||
require.NoError(t, err) |
||||
defer iter.Stop() |
||||
|
||||
curTuple, err = iter.Next(ctx) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tk2, curTuple.GetKey()) |
||||
|
||||
iter, err = ds.ReadStartingWithUser(ctx, "store", storage.ReadStartingWithUserFilter{ |
||||
ObjectType: "folder", |
||||
Relation: "owner", |
||||
UserFilter: []*openfgav1.ObjectRelation{ |
||||
{Object: "user:anne"}, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
defer iter.Stop() |
||||
|
||||
curTuple, err = iter.Next(ctx) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tk, curTuple.GetKey()) |
||||
|
||||
stmt = ` |
||||
INSERT INTO changelog ( |
||||
store, object_type, object_id, relation, _user, ulid, |
||||
condition_name, condition_context, inserted_at, operation |
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('subsec'), ?); |
||||
` |
||||
_, err = ds.db.ExecContext( |
||||
ctx, stmt, "store", "folder", "2021-budget", "owner", "user:anne", |
||||
ulid.Make().String(), nil, nil, openfgav1.TupleOperation_TUPLE_OPERATION_WRITE, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = ds.db.ExecContext( |
||||
ctx, stmt, "store", "folder", "2021-budget", "owner", "user:anne", |
||||
ulid.Make().String(), nil, nil, openfgav1.TupleOperation_TUPLE_OPERATION_DELETE, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
changes, _, err := ds.ReadChanges(ctx, "store", "folder", storage.PaginationOptions{}, 0) |
||||
require.NoError(t, err) |
||||
require.Len(t, changes, 2) |
||||
require.Equal(t, tk, changes[0].GetTupleKey()) |
||||
require.Equal(t, tk, changes[1].GetTupleKey()) |
||||
} |
||||
|
||||
// TestIntegrationMarshalledAssertions tests that previously persisted marshalled
|
||||
// assertions can be read back. In any case where the Assertions proto model
|
||||
// needs to change, we'll likely need to introduce a series of data migrations.
|
||||
func TestIntegrationMarshalledAssertions(t *testing.T) { |
||||
db := sqliteIntegrationTest(t) |
||||
|
||||
ds, err := NewWithDB(db, NewConfig()) |
||||
require.NoError(t, err) |
||||
|
||||
ctx := context.Background() |
||||
// Note: this represents an assertion written on v1.3.7.
|
||||
stmt := ` |
||||
INSERT INTO assertion ( |
||||
store, authorization_model_id, assertions |
||||
) VALUES (?, ?, UNHEX('0A2B0A270A12666F6C6465723A323032312D62756467657412056F776E65721A0A757365723A616E6E657A1001')); |
||||
` |
||||
_, err = ds.db.ExecContext(ctx, stmt, "store", "model") |
||||
require.NoError(t, err) |
||||
|
||||
assertions, err := ds.ReadAssertions(ctx, "store", "model") |
||||
require.NoError(t, err) |
||||
|
||||
expectedAssertions := []*openfgav1.Assertion{ |
||||
{ |
||||
TupleKey: &openfgav1.AssertionTupleKey{ |
||||
Object: "folder:2021-budget", |
||||
Relation: "owner", |
||||
User: "user:annez", |
||||
}, |
||||
Expectation: true, |
||||
}, |
||||
} |
||||
require.Equal(t, expectedAssertions, assertions) |
||||
} |
||||
|
||||
func sqliteIntegrationTest(tb testing.TB) *sql.DB { |
||||
if testing.Short() || !db.IsTestDbSQLite() { |
||||
tb.Skip("skipping integration test") |
||||
} |
||||
|
||||
db, cfg := db.InitTestDBWithCfg(tb) |
||||
|
||||
m := migrator.NewMigrator(db.GetEngine(), cfg) |
||||
err := migration.RunWithMigrator(m, cfg, zassets.EmbedMigrations, zassets.SQLiteMigrationDir) |
||||
require.NoError(tb, err) |
||||
|
||||
return db.GetEngine().DB().DB |
||||
} |
@ -1,165 +0,0 @@ |
||||
package sqlite |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"time" |
||||
|
||||
sq "github.com/Masterminds/squirrel" |
||||
"github.com/oklog/ulid/v2" |
||||
"google.golang.org/protobuf/proto" |
||||
|
||||
openfgav1 "github.com/openfga/api/proto/openfga/v1" |
||||
"github.com/openfga/openfga/pkg/storage" |
||||
tupleUtils "github.com/openfga/openfga/pkg/tuple" |
||||
) |
||||
|
||||
// write is copied from https://github.com/openfga/openfga/blob/main/pkg/storage/sqlcommon/sqlcommon.go#L330-L456
|
||||
// but uses custom handleSQLError.
|
||||
func write( |
||||
ctx context.Context, |
||||
db *sql.DB, |
||||
stbl sq.StatementBuilderType, |
||||
sqlTime any, |
||||
store string, |
||||
deletes storage.Deletes, |
||||
writes storage.Writes, |
||||
now time.Time, |
||||
) error { |
||||
txn, err := db.BeginTx(ctx, nil) |
||||
if err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
defer func() { |
||||
_ = txn.Rollback() |
||||
}() |
||||
|
||||
changelogBuilder := stbl. |
||||
Insert("changelog"). |
||||
Columns( |
||||
"store", "object_type", "object_id", "relation", "_user", |
||||
"condition_name", "condition_context", "operation", "ulid", "inserted_at", |
||||
) |
||||
|
||||
deleteBuilder := stbl.Delete("tuple") |
||||
|
||||
for _, tk := range deletes { |
||||
id := ulid.MustNew(ulid.Timestamp(now), ulid.DefaultEntropy()).String() |
||||
objectType, objectID := tupleUtils.SplitObject(tk.GetObject()) |
||||
|
||||
res, err := deleteBuilder. |
||||
Where(sq.Eq{ |
||||
"store": store, |
||||
"object_type": objectType, |
||||
"object_id": objectID, |
||||
"relation": tk.GetRelation(), |
||||
"_user": tk.GetUser(), |
||||
"user_type": tupleUtils.GetUserTypeFromUser(tk.GetUser()), |
||||
}). |
||||
RunWith(txn). // Part of a txn.
|
||||
ExecContext(ctx) |
||||
if err != nil { |
||||
return handleSQLError(err, tk) |
||||
} |
||||
|
||||
rowsAffected, err := res.RowsAffected() |
||||
if err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
|
||||
if rowsAffected != 1 { |
||||
return storage.InvalidWriteInputError( |
||||
tk, |
||||
openfgav1.TupleOperation_TUPLE_OPERATION_DELETE, |
||||
) |
||||
} |
||||
|
||||
changelogBuilder = changelogBuilder.Values( |
||||
store, objectType, objectID, |
||||
tk.GetRelation(), tk.GetUser(), |
||||
"", nil, // Redact condition info for deletes since we only need the base triplet (object, relation, user).
|
||||
openfgav1.TupleOperation_TUPLE_OPERATION_DELETE, |
||||
id, sqlTime, |
||||
) |
||||
} |
||||
|
||||
insertBuilder := stbl. |
||||
Insert("tuple"). |
||||
Columns( |
||||
"store", "object_type", "object_id", "relation", "_user", "user_type", |
||||
"condition_name", "condition_context", "ulid", "inserted_at", |
||||
) |
||||
|
||||
for _, tk := range writes { |
||||
id := ulid.MustNew(ulid.Timestamp(now), ulid.DefaultEntropy()).String() |
||||
objectType, objectID := tupleUtils.SplitObject(tk.GetObject()) |
||||
|
||||
conditionName, conditionContext, err := marshalRelationshipCondition(tk.GetCondition()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
_, err = insertBuilder. |
||||
Values( |
||||
store, |
||||
objectType, |
||||
objectID, |
||||
tk.GetRelation(), |
||||
tk.GetUser(), |
||||
tupleUtils.GetUserTypeFromUser(tk.GetUser()), |
||||
conditionName, |
||||
conditionContext, |
||||
id, |
||||
sqlTime, |
||||
). |
||||
RunWith(txn). // Part of a txn.
|
||||
ExecContext(ctx) |
||||
if err != nil { |
||||
return handleSQLError(err, tk) |
||||
} |
||||
|
||||
changelogBuilder = changelogBuilder.Values( |
||||
store, |
||||
objectType, |
||||
objectID, |
||||
tk.GetRelation(), |
||||
tk.GetUser(), |
||||
conditionName, |
||||
conditionContext, |
||||
openfgav1.TupleOperation_TUPLE_OPERATION_WRITE, |
||||
id, |
||||
sqlTime, |
||||
) |
||||
} |
||||
|
||||
if len(writes) > 0 || len(deletes) > 0 { |
||||
_, err := changelogBuilder.RunWith(txn).ExecContext(ctx) // Part of a txn.
|
||||
if err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
} |
||||
|
||||
if err := txn.Commit(); err != nil { |
||||
return handleSQLError(err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// copied from https://github.com/openfga/openfga/blob/main/pkg/storage/sqlcommon/encoding.go#L8-L24
|
||||
func marshalRelationshipCondition( |
||||
rel *openfgav1.RelationshipCondition, |
||||
) (name string, context []byte, err error) { |
||||
if rel != nil { |
||||
if rel.GetContext() != nil && len(rel.GetContext().GetFields()) > 0 { |
||||
context, err = proto.Marshal(rel.GetContext()) |
||||
if err != nil { |
||||
return name, context, err |
||||
} |
||||
} |
||||
|
||||
return rel.GetName(), context, err |
||||
} |
||||
|
||||
return name, context, err |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@ |
||||
module github.com/grafana/grafana/scripts/modowners |
||||
|
||||
go 1.19 |
||||
go 1.23.7 |
||||
|
||||
require golang.org/x/mod v0.10.0 |
||||
require golang.org/x/mod v0.17.0 |
||||
|
@ -1,2 +1,2 @@ |
||||
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= |
||||
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= |
||||
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= |
||||
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= |
||||
|
Loading…
Reference in new issue