Folders: Switch order of the columns in folder table indexes so that org_id becomes first (#82454)

* Folders: Switch order of the columns in folder table so that org_id becomes first
pull/82918/head^2
Sofia Papagiannaki 1 year ago committed by GitHub
parent c6f8462a06
commit 38e8c62972
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      pkg/services/dashboards/database/migrations/folder_uid_migrator.go
  2. 5
      pkg/services/folder/folderimpl/folder.go
  3. 13
      pkg/services/folder/folderimpl/sqlstore.go
  4. 25
      pkg/services/sqlstore/migrations/folder_mig.go
  5. 24
      pkg/services/sqlstore/permissions/dashboard.go
  6. 20
      pkg/services/sqlstore/permissions/dashboard_filter_no_subquery.go
  7. 1
      pkg/services/sqlstore/searchstore/builder.go
  8. 4
      pkg/services/sqlstore/searchstore/search_test.go

@ -39,11 +39,14 @@ func (m *FolderUIDMigration) Exec(sess *xorm.Session, mgrtr *migrator.Migrator)
}
// for folders the source of truth is the folder table
// covered by UQE_folder_org_id_uid
q = `UPDATE dashboard
SET folder_uid = folder.parent_uid
FROM folder
WHERE dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id
AND dashboard.is_folder = ?`
// covered by UQE_folder_org_id_uid
if mgrtr.Dialect.DriverName() == migrator.MySQL {
q = `UPDATE dashboard
SET folder_uid = (

@ -88,18 +88,21 @@ func (s *Service) DBMigration(db db.DB) {
err := db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
var err error
if db.GetDialect().DriverName() == migrator.SQLite {
// covered by UQE_folder_org_id_uid
_, err = sess.Exec(`
INSERT INTO folder (uid, org_id, title, created, updated)
SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder = 1
ON CONFLICT DO UPDATE SET title=excluded.title, updated=excluded.updated
`)
} else if db.GetDialect().DriverName() == migrator.Postgres {
// covered by UQE_folder_org_id_uid
_, err = sess.Exec(`
INSERT INTO folder (uid, org_id, title, created, updated)
SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder = true
ON CONFLICT(uid, org_id) DO UPDATE SET title=excluded.title, updated=excluded.updated
`)
} else {
// covered by UQE_folder_org_id_uid
_, err = sess.Exec(`
INSERT INTO folder (uid, org_id, title, created, updated)
SELECT * FROM (SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder = 1) AS derived
@ -109,6 +112,8 @@ func (s *Service) DBMigration(db db.DB) {
if err != nil {
return err
}
// covered by UQE_folder_org_id_uid
_, err = sess.Exec(`
DELETE FROM folder WHERE NOT EXISTS
(SELECT 1 FROM dashboard WHERE dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id AND dashboard.is_folder = true)

@ -88,6 +88,7 @@ func (ss *sqlStore) Delete(ctx context.Context, UIDs []string, orgID int64) erro
return nil
}
return ss.db.WithDbSession(ctx, func(sess *db.Session) error {
// covered by UQE_folder_org_id_uid
s := fmt.Sprintf("DELETE FROM folder WHERE org_id=? AND uid IN (%s)", strings.Repeat("?, ", len(UIDs)-1)+"?")
sqlArgs := make([]any, 0, len(UIDs)+2)
sqlArgs = append(sqlArgs, s, orgID)
@ -140,6 +141,7 @@ func (ss *sqlStore) Update(ctx context.Context, cmd folder.UpdateFolderCommand)
}
sql.WriteString(strings.Join(columnsToUpdate, ", "))
// covered by UQE_folder_org_id_uid
sql.WriteString(" WHERE uid = ? AND org_id = ?")
args = append(args, cmd.UID, cmd.OrgID)
@ -205,14 +207,17 @@ func (ss *sqlStore) Get(ctx context.Context, q folder.GetFolderQuery) (*folder.F
}
switch {
case q.UID != nil:
// covered UQE_folder_uid_org_id
s.WriteString(" WHERE f0.uid = ? AND f0.org_id = ?")
exists, err = sess.SQL(s.String(), q.UID, q.OrgID).Get(foldr)
// nolint:staticcheck
case q.ID != nil:
s.WriteString(" WHERE f0.id = ?")
metrics.MFolderIDsServiceCount.WithLabelValues(metrics.Folder).Inc()
// covered by primary key
exists, err = sess.SQL(s.String(), q.ID).Get(foldr)
case q.Title != nil:
// covered by UQE_folder_org_id_parent_uid_title
s.WriteString(" WHERE f0.title = ? AND f0.org_id = ?")
args := []any{*q.Title, q.OrgID}
if q.ParentUID != nil {
@ -245,6 +250,7 @@ func (ss *sqlStore) GetParents(ctx context.Context, q folder.GetParentsQuery) ([
}
var folders []*folder.Folder
// covered by UQE_folder_org_id_uid
recQuery := `
WITH RECURSIVE RecQry AS (
SELECT * FROM folder WHERE uid = ? AND org_id = ?
@ -295,6 +301,7 @@ func (ss *sqlStore) GetChildren(ctx context.Context, q folder.GetChildrenQuery)
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
sql := strings.Builder{}
args := make([]any, 0, 2)
// covered by UQE_folder_org_id_parent_uid_title
if q.UID == "" {
sql.WriteString("SELECT * FROM folder WHERE parent_uid IS NULL AND org_id=?")
args = append(args, q.OrgID)
@ -342,6 +349,7 @@ func (ss *sqlStore) GetChildren(ctx context.Context, q folder.GetChildrenQuery)
func (ss *sqlStore) getParentsMySQL(ctx context.Context, q folder.GetParentsQuery) (folders []*folder.Folder, err error) {
err = ss.db.WithDbSession(ctx, func(sess *db.Session) error {
uid := ""
// covered by UQE_folder_org_id_uid
ok, err := sess.SQL("SELECT parent_uid FROM folder WHERE org_id=? AND uid=?", q.OrgID, q.UID).Get(&uid)
if err != nil {
return err
@ -351,6 +359,7 @@ func (ss *sqlStore) getParentsMySQL(ctx context.Context, q folder.GetParentsQuer
}
for {
f := &folder.Folder{}
// covered by UQE_folder_org_id_uid
ok, err := sess.SQL("SELECT * FROM folder WHERE org_id=? AND uid=?", q.OrgID, uid).Get(f)
if err != nil {
return err
@ -454,6 +463,7 @@ func (ss *sqlStore) GetFolders(ctx context.Context, q getFoldersQuery) ([]*folde
if q.WithFullpath || q.WithFullpathUIDs || len(q.ancestorUIDs) > 0 {
s.WriteString(getFullpathJoinsSQL())
}
// covered by UQE_folder_org_id_uid
s.WriteString(` WHERE f0.org_id=?`)
args := []any{q.OrgID}
if len(partialUIDs) > 0 {
@ -509,6 +519,7 @@ func (ss *sqlStore) GetDescendants(ctx context.Context, orgID int64, ancestor_ui
}
switch recursiveQueriesAreSupported {
case true:
// covered by UQE_folder_org_id_parent_uid_title
recQuery := `
WITH RECURSIVE RecQry AS (
SELECT * FROM folder WHERE parent_uid = ? AND org_id = ?
@ -532,6 +543,7 @@ func (ss *sqlStore) GetDescendants(ctx context.Context, orgID int64, ancestor_ui
s := strings.Builder{}
args := make([]any, 0, 1+folder.MaxNestedFolderDepth)
args = append(args, orgID)
// covered by UQE_folder_org_id_uid
s.WriteString(`SELECT f0.id, f0.org_id, f0.uid, f0.parent_uid, f0.title, f0.description, f0.created, f0.updated`)
s.WriteString(` FROM folder f0`)
s.WriteString(getFullpathJoinsSQL())
@ -585,6 +597,7 @@ func getFullapathUIDsSQL(dialect migrator.Dialect) string {
func getFullpathJoinsSQL() string {
joins := make([]string, 0, folder.MaxNestedFolderDepth)
for i := 1; i <= folder.MaxNestedFolderDepth; i++ {
// covered by UQE_folder_org_id_uid
joins = append(joins, fmt.Sprintf(` LEFT JOIN folder f%d ON f%d.org_id = f%d.org_id AND f%d.uid = f%d.parent_uid`, i, i, i-1, i, i-1))
}
return strings.Join(joins, "\n")

@ -55,6 +55,31 @@ func addFolderMigrations(mg *migrator.Migrator) {
DELETE FROM folder WHERE NOT EXISTS
(SELECT 1 FROM dashboard WHERE dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id AND dashboard.is_folder = true)
`))
mg.AddMigration("Remove unique index UQE_folder_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{
Type: migrator.UniqueIndex,
Cols: []string{"uid", "org_id"},
}))
mg.AddMigration("Add unique index UQE_folder_org_id_uid", migrator.NewAddIndexMigration(folderv1(), &migrator.Index{
Type: migrator.UniqueIndex,
Cols: []string{"org_id", "uid"},
}))
mg.AddMigration("Remove unique index UQE_folder_title_parent_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{
Type: migrator.UniqueIndex,
Cols: []string{"title", "parent_uid", "org_id"},
}))
mg.AddMigration("Add unique index UQE_folder_org_id_parent_uid_title", migrator.NewAddIndexMigration(folderv1(), &migrator.Index{
Type: migrator.UniqueIndex,
Cols: []string{"org_id", "parent_uid", "title"},
}))
// No need to introduce IDX_folder_org_id_parent_uid because is covered by UQE_folder_org_id_parent_uid_title
mg.AddMigration("Remove index IDX_folder_parent_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{
Cols: []string{"parent_uid", "org_id"},
}))
}
func folderv1() migrator.Table {

@ -42,7 +42,7 @@ type PermissionsFilter interface {
Where() (string, []any)
buildClauses()
nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTableCol string, rightTableCol string, orgID int64) (string, []any)
nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTable string, Col string, rightTableCol string, orgID int64) (string, []any)
}
// NewAccessControlDashboardPermissionFilter creates a new AccessControlDashboardPermissionFilter that is configured with specific actions calculated based on the dashboardaccess.PermissionType and query type
@ -211,11 +211,11 @@ func (f *accessControlDashboardPermissionFilter) buildClauses() {
case true:
builder.WriteString("(dashboard.folder_id IN (SELECT d.id FROM dashboard as d ")
recQueryName := fmt.Sprintf("RecQry%d", len(f.recQueries))
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs)
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs, orgID)
builder.WriteString(fmt.Sprintf("WHERE d.org_id = ? AND d.uid IN (SELECT uid FROM %s)", recQueryName))
args = append(args, orgID)
default:
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard.folder_id", "d.id", orgID)
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard", "folder_id", "d.id", orgID)
builder.WriteRune('(')
builder.WriteString(nestedFoldersSelectors)
args = append(args, nestedFoldersArgs...)
@ -289,11 +289,11 @@ func (f *accessControlDashboardPermissionFilter) buildClauses() {
switch f.recursiveQueriesAreSupported {
case true:
recQueryName := fmt.Sprintf("RecQry%d", len(f.recQueries))
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs)
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs, orgID)
builder.WriteString("(dashboard.uid IN ")
builder.WriteString(fmt.Sprintf("(SELECT uid FROM %s)", recQueryName))
default:
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard.uid", "d.uid", orgID)
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard", "uid", "d.uid", orgID)
builder.WriteRune('(')
builder.WriteString(nestedFoldersSelectors)
builder.WriteRune(')')
@ -340,15 +340,17 @@ func (f *accessControlDashboardPermissionFilter) With() (string, []any) {
return sb.String(), params
}
func (f *accessControlDashboardPermissionFilter) addRecQry(queryName string, whereUIDSelect string, whereParams []any) {
func (f *accessControlDashboardPermissionFilter) addRecQry(queryName string, whereUIDSelect string, whereParams []any, orgID int64) {
if f.recQueries == nil {
f.recQueries = make([]clause, 0, maximumRecursiveQueries)
}
c := make([]any, len(whereParams))
copy(c, whereParams)
c = append([]any{orgID}, c...)
f.recQueries = append(f.recQueries, clause{
// covered by UQE_folder_org_id_uid and UQE_folder_org_id_parent_uid_title
string: fmt.Sprintf(`%s AS (
SELECT uid, parent_uid, org_id FROM folder WHERE uid IN %s
SELECT uid, parent_uid, org_id FROM folder WHERE org_id = ? AND uid IN %s
UNION ALL SELECT f.uid, f.parent_uid, f.org_id FROM folder f INNER JOIN %s r ON f.parent_uid = r.uid and f.org_id = r.org_id
)`, queryName, whereUIDSelect, queryName),
params: c,
@ -378,12 +380,13 @@ func actionsToCheck(actions []string, permissions map[string][]string, wildcards
return toCheck
}
func (f *accessControlDashboardPermissionFilter) nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTableCol string, rightTableCol string, orgID int64) (string, []any) {
func (f *accessControlDashboardPermissionFilter) nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTable string, leftCol string, rightTableCol string, orgID int64) (string, []any) {
wheres := make([]string, 0, folder.MaxNestedFolderDepth+1)
args := make([]any, 0, len(permSelectorArgs)*(folder.MaxNestedFolderDepth+1))
joins := make([]string, 0, folder.MaxNestedFolderDepth+2)
// covered by UQE_folder_org_id_uid
tmpl := "INNER JOIN folder %s ON %s.%s = %s.uid AND %s.org_id = %s.org_id "
prev := "d"
@ -393,8 +396,9 @@ func (f *accessControlDashboardPermissionFilter) nestedFoldersSelectors(permSele
s := fmt.Sprintf(tmpl, t, prev, onCol, t, prev, t)
joins = append(joins, s)
wheres = append(wheres, fmt.Sprintf("(%s IN (SELECT %s FROM dashboard d %s WHERE %s.org_id = ? AND %s.uid IN %s)", leftTableCol, rightTableCol, strings.Join(joins, " "), t, t, permSelector))
args = append(args, orgID)
// covered by UQE_folder_org_id_uid
wheres = append(wheres, fmt.Sprintf("(%s.org_id = ? AND %s.%s IN (SELECT %s FROM dashboard d %s WHERE %s.org_id = ? AND %s.uid IN %s)", leftTable, leftTable, leftCol, rightTableCol, strings.Join(joins, " "), t, t, permSelector))
args = append(args, orgID, orgID)
args = append(args, permSelectorArgs...)
prev = t

@ -122,10 +122,10 @@ func (f *accessControlDashboardPermissionFilterNoFolderSubquery) buildClauses()
switch f.recursiveQueriesAreSupported {
case true:
recQueryName := fmt.Sprintf("RecQry%d", len(f.recQueries))
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs)
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs, orgID)
builder.WriteString("(folder.uid IN (SELECT uid FROM " + recQueryName)
default:
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "folder.uid", "", orgID)
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "folder", "uid", "", orgID)
builder.WriteRune('(')
builder.WriteString(nestedFoldersSelectors)
args = append(args, nestedFoldersArgs...)
@ -199,11 +199,11 @@ func (f *accessControlDashboardPermissionFilterNoFolderSubquery) buildClauses()
switch f.recursiveQueriesAreSupported {
case true:
recQueryName := fmt.Sprintf("RecQry%d", len(f.recQueries))
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs)
f.addRecQry(recQueryName, permSelector.String(), permSelectorArgs, orgID)
builder.WriteString("(dashboard.uid IN ")
builder.WriteString(fmt.Sprintf("(SELECT uid FROM %s)", recQueryName))
default:
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard.uid", "", orgID)
nestedFoldersSelectors, nestedFoldersArgs := f.nestedFoldersSelectors(permSelector.String(), permSelectorArgs, "dashboard", "uid", "", orgID)
builder.WriteRune('(')
builder.WriteString(nestedFoldersSelectors)
builder.WriteRune(')')
@ -231,15 +231,18 @@ func (f *accessControlDashboardPermissionFilterNoFolderSubquery) buildClauses()
f.where = clause{string: builder.String(), params: args}
}
func (f *accessControlDashboardPermissionFilterNoFolderSubquery) nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTableCol string, _ string, orgID int64) (string, []any) {
func (f *accessControlDashboardPermissionFilterNoFolderSubquery) nestedFoldersSelectors(permSelector string, permSelectorArgs []any, leftTable string, leftCol string, _ string, orgID int64) (string, []any) {
wheres := make([]string, 0, folder.MaxNestedFolderDepth+1)
args := make([]any, 0, len(permSelectorArgs)*(folder.MaxNestedFolderDepth+1))
joins := make([]string, 0, folder.MaxNestedFolderDepth+2)
// covered by UQE_folder_org_id_parent_uid_title
tmpl := "INNER JOIN folder %s ON %s.parent_uid = %s.uid AND %s.org_id = %s.org_id "
wheres = append(wheres, fmt.Sprintf("(%s IN (SELECT f1.uid FROM folder f1 WHERE f1.uid IN %s)", leftTableCol, permSelector))
// covered by UQE_folder_org_id_uid
wheres = append(wheres, fmt.Sprintf("(%s.org_id = ? AND %s.%s IN (SELECT f1.uid FROM folder f1 WHERE f1.org_id = ? AND f1.uid IN %s)", leftTable, leftTable, leftCol, permSelector))
args = append(args, orgID, orgID)
args = append(args, permSelectorArgs...)
prev := "f1"
@ -248,8 +251,9 @@ func (f *accessControlDashboardPermissionFilterNoFolderSubquery) nestedFoldersSe
s := fmt.Sprintf(tmpl, t, prev, t, prev, t)
joins = append(joins, s)
wheres = append(wheres, fmt.Sprintf("(%s IN (SELECT f1.uid FROM folder f1 %s WHERE %s.org_id = ? AND %s.uid IN %s)", leftTableCol, strings.Join(joins, " "), t, t, permSelector))
args = append(args, orgID)
// covered by UQE_folder_org_id_uid
wheres = append(wheres, fmt.Sprintf("(%s.org_id = ? AND %s.%s IN (SELECT f1.uid FROM folder f1 %s WHERE %s.org_id = ? AND %s.uid IN %s)", leftTable, leftTable, leftCol, strings.Join(joins, " "), t, t, permSelector))
args = append(args, orgID, orgID)
args = append(args, permSelectorArgs...)
prev = t

@ -38,6 +38,7 @@ func (b *Builder) ToSQL(limit, page int64) (string, []any) {
b.sql.WriteString("\n")
if b.Features.IsEnabledGlobally(featuremgmt.FlagNestedFolders) {
// covered by UQE_folder_org_id_uid
b.sql.WriteString(
`LEFT OUTER JOIN folder ON folder.uid = dashboard.folder_uid AND folder.org_id = dashboard.org_id`)
} else {

@ -180,6 +180,7 @@ func TestBuilder_RBAC(t *testing.T) {
},
features: featuremgmt.WithFeatures(featuremgmt.FlagNestedFolders),
expectedParams: []any{
int64(1),
int64(1),
int64(1),
0,
@ -191,6 +192,7 @@ func TestBuilder_RBAC(t *testing.T) {
2,
int64(1),
int64(1),
int64(1),
0,
"Viewer",
int64(1),
@ -255,6 +257,7 @@ func TestBuilder_RBAC(t *testing.T) {
},
features: featuremgmt.WithFeatures(featuremgmt.FlagNestedFolders, featuremgmt.FlagPermissionsFilterRemoveSubquery),
expectedParams: []any{
int64(1),
int64(1),
int64(1),
0,
@ -266,6 +269,7 @@ func TestBuilder_RBAC(t *testing.T) {
2,
int64(1),
int64(1),
int64(1),
0,
"Viewer",
int64(1),

Loading…
Cancel
Save