mirror of https://github.com/grafana/grafana
Zanzana: Initial dashboard search (#93093)
* Zanzana: Search in a background and compare results * refactor * Search with check * instrument zanzana client * add single_read option * refactor * refactor move check into separate function * Fix tests * refactor * refactor getFindDashboardsFn * add resource type to span attributes * run ListObjects concurrently * Use list and search in less cases * adjust metrics buckets * refactor: move Check and ListObjects to AccessControl implementation * Revert "Fix tests" This reverts commitnjvrzm/cloudwatch-nil-pointer-guardb0c2f072a2
. * refactor: use own types for Check and ListObjects inside accesscontrol package * Fix search scenario with low limit and empty query string * more accurate search with checks * revert * fix linter * Revert "revert" This reverts commitee5f14eea8
. * add search errors metric * fix query performance under some conditions * simplify check strategy * fix pagination * refactor findDashboardsZanzanaList * Iterate over multiple pages while making check request * refactor listUserResources * avoid unnecessary db call * remove unused zclient * Add notes for SkipAccessControlFilter * use more accurate check loop * always use check for search with provided UIDs * rename single_read to zanzana_only_evaluation * refactor * update go workspace * fix linter * don't use deprecated fields * refactor * fail if no org specified * refactor * initial integration tests * Fix tests * fix linter errors * fix linter * Fix tests * review suggestions Co-authored-by: Gabriel MABILLE <gamab@users.noreply.github.com> * fix limit * refactor * refactor tests * fix db config in tests * fix migrator (postgres) --------- Co-authored-by: Gabriel MABILLE <gamab@users.noreply.github.com>
parent
f403bc57d5
commit
5d724c2482
@ -0,0 +1,342 @@ |
||||
package service |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/accesscontrol" |
||||
"github.com/grafana/grafana/pkg/services/authz/zanzana" |
||||
"github.com/grafana/grafana/pkg/services/dashboards" |
||||
"github.com/grafana/grafana/pkg/services/sqlstore/searchstore" |
||||
) |
||||
|
||||
const ( |
||||
// If search query string shorter than this value, then "List, then check" strategy will be used
|
||||
listQueryLengthThreshold = 8 |
||||
// If query limit set to value higher than this value, then "List, then check" strategy will be used
|
||||
listQueryLimitThreshold = 50 |
||||
defaultQueryLimit = 1000 |
||||
) |
||||
|
||||
type searchResult struct { |
||||
runner string |
||||
result []dashboards.DashboardSearchProjection |
||||
err error |
||||
duration time.Duration |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) FindDashboardsZanzana(ctx context.Context, query *dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
if dr.cfg.Zanzana.ZanzanaOnlyEvaluation { |
||||
return dr.findDashboardsZanzanaOnly(ctx, *query) |
||||
} |
||||
return dr.findDashboardsZanzanaCompare(ctx, *query) |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaOnly(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
timer := prometheus.NewTimer(dr.metrics.searchRequestsDuration.WithLabelValues("zanzana")) |
||||
defer timer.ObserveDuration() |
||||
|
||||
return dr.findDashboardsZanzana(ctx, query) |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaCompare(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
result := make(chan searchResult, 2) |
||||
|
||||
go func() { |
||||
timer := prometheus.NewTimer(dr.metrics.searchRequestsDuration.WithLabelValues("zanzana")) |
||||
defer timer.ObserveDuration() |
||||
start := time.Now() |
||||
|
||||
queryZanzana := query |
||||
res, err := dr.findDashboardsZanzana(ctx, queryZanzana) |
||||
result <- searchResult{"zanzana", res, err, time.Since(start)} |
||||
}() |
||||
|
||||
go func() { |
||||
timer := prometheus.NewTimer(dr.metrics.searchRequestsDuration.WithLabelValues("grafana")) |
||||
defer timer.ObserveDuration() |
||||
start := time.Now() |
||||
|
||||
res, err := dr.FindDashboards(ctx, &query) |
||||
result <- searchResult{"grafana", res, err, time.Since(start)} |
||||
}() |
||||
|
||||
first, second := <-result, <-result |
||||
close(result) |
||||
|
||||
if second.runner == "grafana" { |
||||
first, second = second, first |
||||
} |
||||
|
||||
if second.err != nil { |
||||
dr.log.Error("zanzana search failed", "error", second.err) |
||||
dr.metrics.searchRequestStatusTotal.WithLabelValues("error").Inc() |
||||
} else if len(first.result) != len(second.result) { |
||||
dr.metrics.searchRequestStatusTotal.WithLabelValues("error").Inc() |
||||
dr.log.Warn( |
||||
"zanzana search result does not match grafana", |
||||
"grafana_result_len", len(first.result), |
||||
"zanana_result_len", len(second.result), |
||||
"grafana_duration", first.duration, |
||||
"zanzana_duration", second.duration, |
||||
) |
||||
} else { |
||||
dr.metrics.searchRequestStatusTotal.WithLabelValues("success").Inc() |
||||
dr.log.Debug("zanzana search is correct", "result_len", len(first.result), "grafana_duration", first.duration, "zanzana_duration", second.duration) |
||||
} |
||||
|
||||
return first.result, first.err |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzana(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
findDashboards := dr.getFindDashboardsFn(query) |
||||
return findDashboards(ctx, query) |
||||
} |
||||
|
||||
type findDashboardsFn func(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) |
||||
|
||||
// getFindDashboardsFn makes a decision which search method should be used
|
||||
func (dr *DashboardServiceImpl) getFindDashboardsFn(query dashboards.FindPersistedDashboardsQuery) findDashboardsFn { |
||||
if query.Limit > 0 && query.Limit < listQueryLimitThreshold && len(query.Title) > 0 { |
||||
return dr.findDashboardsZanzanaCheck |
||||
} |
||||
if len(query.DashboardUIDs) > 0 || len(query.DashboardIds) > 0 { |
||||
return dr.findDashboardsZanzanaCheck |
||||
} |
||||
if len(query.FolderUIDs) > 0 { |
||||
return dr.findDashboardsZanzanaCheck |
||||
} |
||||
if len(query.Title) <= listQueryLengthThreshold { |
||||
return dr.findDashboardsZanzanaList |
||||
} |
||||
return dr.findDashboardsZanzanaCheck |
||||
} |
||||
|
||||
// findDashboardsZanzanaCheck implements "Search, then check" strategy. It first performs search query, then filters out results
|
||||
// by checking access to each item.
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaCheck(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
ctx, span := tracer.Start(ctx, "dashboards.service.findDashboardsZanzanaCheck") |
||||
defer span.End() |
||||
|
||||
result := make([]dashboards.DashboardSearchProjection, 0, query.Limit) |
||||
var page int64 = 1 |
||||
query.SkipAccessControlFilter = true |
||||
// Remember initial query limit
|
||||
limit := query.Limit |
||||
// Set limit to default to prevent pagination issues
|
||||
query.Limit = defaultQueryLimit |
||||
|
||||
for len(result) < int(limit) { |
||||
query.Page = page |
||||
findRes, err := dr.dashboardStore.FindDashboards(ctx, &query) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
remains := limit - int64(len(result)) |
||||
res, err := dr.checkDashboards(ctx, query, findRes, remains) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
result = append(result, res...) |
||||
page++ |
||||
|
||||
// Stop when last page reached
|
||||
if len(findRes) < defaultQueryLimit { |
||||
break |
||||
} |
||||
} |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) checkDashboards(ctx context.Context, query dashboards.FindPersistedDashboardsQuery, searchRes []dashboards.DashboardSearchProjection, remains int64) ([]dashboards.DashboardSearchProjection, error) { |
||||
ctx, span := tracer.Start(ctx, "dashboards.service.checkDashboards") |
||||
defer span.End() |
||||
|
||||
if len(searchRes) == 0 { |
||||
return nil, nil |
||||
} |
||||
|
||||
orgId := query.OrgId |
||||
if orgId == 0 && query.SignedInUser.GetOrgID() != 0 { |
||||
orgId = query.SignedInUser.GetOrgID() |
||||
} else { |
||||
return nil, dashboards.ErrUserIsNotSignedInToOrg |
||||
} |
||||
|
||||
concurrentRequests := dr.cfg.Zanzana.ConcurrentChecks |
||||
var wg sync.WaitGroup |
||||
res := make([]dashboards.DashboardSearchProjection, 0) |
||||
resToCheck := make(chan dashboards.DashboardSearchProjection, concurrentRequests) |
||||
allowedResults := make(chan dashboards.DashboardSearchProjection, len(searchRes)) |
||||
|
||||
for i := 0; i < int(concurrentRequests); i++ { |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
for d := range resToCheck { |
||||
if int64(len(allowedResults)) >= remains { |
||||
return |
||||
} |
||||
|
||||
objectType := zanzana.TypeDashboard |
||||
if d.IsFolder { |
||||
objectType = zanzana.TypeFolder |
||||
} |
||||
|
||||
req := accesscontrol.CheckRequest{ |
||||
User: query.SignedInUser.GetUID(), |
||||
Relation: "read", |
||||
Object: zanzana.NewScopedTupleEntry(objectType, d.UID, "", strconv.FormatInt(orgId, 10)), |
||||
} |
||||
|
||||
allowed, err := dr.ac.Check(ctx, req) |
||||
if err != nil { |
||||
dr.log.Error("error checking access", "error", err) |
||||
} else if allowed { |
||||
allowedResults <- d |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
for _, r := range searchRes { |
||||
resToCheck <- r |
||||
} |
||||
close(resToCheck) |
||||
|
||||
wg.Wait() |
||||
close(allowedResults) |
||||
|
||||
for r := range allowedResults { |
||||
if int64(len(res)) >= remains { |
||||
break |
||||
} |
||||
res = append(res, r) |
||||
} |
||||
|
||||
return res, nil |
||||
} |
||||
|
||||
// findDashboardsZanzanaList implements "List, then search" strategy. It first retrieve a list of resources
|
||||
// with given type available to the user and then passes that list as a filter to the search query.
|
||||
func (dr *DashboardServiceImpl) findDashboardsZanzanaList(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { |
||||
// Always use "search, then check" if dashboard or folder UIDs provided. Otherwise we should make intersection
|
||||
// of user's resources and provided UIDs which might not be correct if ListObjects() request is limited by OpenFGA.
|
||||
if len(query.DashboardUIDs) > 0 || len(query.DashboardIds) > 0 || len(query.FolderUIDs) > 0 { |
||||
return dr.findDashboardsZanzanaCheck(ctx, query) |
||||
} |
||||
|
||||
ctx, span := tracer.Start(ctx, "dashboards.service.findDashboardsZanzanaList") |
||||
defer span.End() |
||||
|
||||
resourceUIDs, err := dr.listUserResources(ctx, query) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if len(resourceUIDs) == 0 { |
||||
return []dashboards.DashboardSearchProjection{}, nil |
||||
} |
||||
|
||||
query.DashboardUIDs = resourceUIDs |
||||
query.SkipAccessControlFilter = true |
||||
return dr.dashboardStore.FindDashboards(ctx, &query) |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) listUserResources(ctx context.Context, query dashboards.FindPersistedDashboardsQuery) ([]string, error) { |
||||
tasks := make([]func() ([]string, error), 0) |
||||
var resourceTypes []string |
||||
|
||||
// For some search types we need dashboards or folders only
|
||||
switch query.Type { |
||||
case searchstore.TypeDashboard: |
||||
resourceTypes = []string{zanzana.TypeDashboard} |
||||
case searchstore.TypeFolder, searchstore.TypeAlertFolder: |
||||
resourceTypes = []string{zanzana.TypeFolder} |
||||
default: |
||||
resourceTypes = []string{zanzana.TypeDashboard, zanzana.TypeFolder} |
||||
} |
||||
|
||||
for _, resourceType := range resourceTypes { |
||||
tasks = append(tasks, func() ([]string, error) { |
||||
return dr.listAllowedResources(ctx, query, resourceType) |
||||
}) |
||||
} |
||||
|
||||
uids, err := runBatch(tasks) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return uids, nil |
||||
} |
||||
|
||||
func (dr *DashboardServiceImpl) listAllowedResources(ctx context.Context, query dashboards.FindPersistedDashboardsQuery, resourceType string) ([]string, error) { |
||||
res, err := dr.ac.ListObjects(ctx, accesscontrol.ListObjectsRequest{ |
||||
User: query.SignedInUser.GetUID(), |
||||
Type: resourceType, |
||||
Relation: "read", |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
orgId := query.OrgId |
||||
if orgId == 0 && query.SignedInUser.GetOrgID() != 0 { |
||||
orgId = query.SignedInUser.GetOrgID() |
||||
} else { |
||||
return nil, dashboards.ErrUserIsNotSignedInToOrg |
||||
} |
||||
// dashboard:<orgId>-
|
||||
prefix := fmt.Sprintf("%s:%d-", resourceType, orgId) |
||||
|
||||
resourceUIDs := make([]string, 0) |
||||
for _, d := range res { |
||||
if uid, found := strings.CutPrefix(d, prefix); found { |
||||
resourceUIDs = append(resourceUIDs, uid) |
||||
} |
||||
} |
||||
|
||||
return resourceUIDs, nil |
||||
} |
||||
|
||||
func runBatch(tasks []func() ([]string, error)) ([]string, error) { |
||||
var wg sync.WaitGroup |
||||
tasksNum := len(tasks) |
||||
resChan := make(chan []string, tasksNum) |
||||
errChan := make(chan error, tasksNum) |
||||
|
||||
for _, task := range tasks { |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
res, err := task() |
||||
resChan <- res |
||||
errChan <- err |
||||
}() |
||||
} |
||||
|
||||
wg.Wait() |
||||
close(resChan) |
||||
close(errChan) |
||||
|
||||
for err := range errChan { |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
result := make([]string, 0) |
||||
for res := range resChan { |
||||
result = append(result, res...) |
||||
} |
||||
return result, nil |
||||
} |
@ -0,0 +1,121 @@ |
||||
package service |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db" |
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/acimpl" |
||||
"github.com/grafana/grafana/pkg/services/accesscontrol/migrator" |
||||
accesscontrolmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock" |
||||
"github.com/grafana/grafana/pkg/services/authz" |
||||
"github.com/grafana/grafana/pkg/services/dashboards" |
||||
"github.com/grafana/grafana/pkg/services/dashboards/database" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/services/folder/folderimpl" |
||||
"github.com/grafana/grafana/pkg/services/folder/foldertest" |
||||
"github.com/grafana/grafana/pkg/services/guardian" |
||||
"github.com/grafana/grafana/pkg/services/quota/quotatest" |
||||
"github.com/grafana/grafana/pkg/services/tag/tagimpl" |
||||
"github.com/grafana/grafana/pkg/services/user" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
func TestIntegrationDashboardServiceZanzana(t *testing.T) { |
||||
if testing.Short() { |
||||
t.Skip("skipping integration test") |
||||
} |
||||
|
||||
t.Run("Zanzana enabled", func(t *testing.T) { |
||||
// t.Helper()
|
||||
|
||||
features := featuremgmt.WithFeatures(featuremgmt.FlagZanzana) |
||||
|
||||
db, cfg := db.InitTestDBWithCfg(t) |
||||
|
||||
// Enable zanzana and run in embedded mode (part of grafana server)
|
||||
cfg.Zanzana.ZanzanaOnlyEvaluation = true |
||||
cfg.Zanzana.Mode = setting.ZanzanaModeEmbedded |
||||
cfg.Zanzana.ConcurrentChecks = 10 |
||||
|
||||
_, err := cfg.Raw.Section("rbac").NewKey("resources_with_managed_permissions_on_creation", "dashboard, folder") |
||||
require.NoError(t, err) |
||||
|
||||
quotaService := quotatest.New(false, nil) |
||||
tagService := tagimpl.ProvideService(db) |
||||
folderStore := folderimpl.ProvideDashboardFolderStore(db) |
||||
fStore := folderimpl.ProvideStore(db) |
||||
dashboardStore, err := database.ProvideDashboardStore(db, cfg, features, tagService, quotaService) |
||||
require.NoError(t, err) |
||||
|
||||
zclient, err := authz.ProvideZanzana(cfg, db, features) |
||||
require.NoError(t, err) |
||||
ac := acimpl.ProvideAccessControl(featuremgmt.WithFeatures(), zclient) |
||||
|
||||
service, err := ProvideDashboardServiceImpl( |
||||
cfg, dashboardStore, folderStore, |
||||
featuremgmt.WithFeatures(), |
||||
accesscontrolmock.NewMockedPermissionsService(), |
||||
accesscontrolmock.NewMockedPermissionsService(), |
||||
ac, |
||||
foldertest.NewFakeService(), |
||||
fStore, |
||||
nil, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
guardianMock := &guardian.FakeDashboardGuardian{ |
||||
CanSaveValue: true, |
||||
} |
||||
guardian.MockDashboardGuardian(guardianMock) |
||||
|
||||
createDashboards(t, service, 100, "test-a") |
||||
createDashboards(t, service, 100, "test-b") |
||||
|
||||
// Sync Grafana DB with zanzana (migrate data)
|
||||
zanzanaSyncronizer := migrator.NewZanzanaSynchroniser(zclient, db) |
||||
err = zanzanaSyncronizer.Sync(context.Background()) |
||||
require.NoError(t, err) |
||||
|
||||
query := &dashboards.FindPersistedDashboardsQuery{ |
||||
Title: "test-a", |
||||
Limit: 1000, |
||||
SignedInUser: &user.SignedInUser{ |
||||
OrgID: 1, |
||||
UserID: 1, |
||||
}, |
||||
} |
||||
res, err := service.FindDashboardsZanzana(context.Background(), query) |
||||
|
||||
require.NoError(t, err) |
||||
assert.Equal(t, 0, len(res)) |
||||
}) |
||||
} |
||||
|
||||
func createDashboard(t *testing.T, service dashboards.DashboardService, uid, title string) { |
||||
dto := &dashboards.SaveDashboardDTO{ |
||||
OrgID: 1, |
||||
// User: user,
|
||||
User: &user.SignedInUser{ |
||||
OrgID: 1, |
||||
UserID: 1, |
||||
}, |
||||
} |
||||
dto.Dashboard = dashboards.NewDashboard(title) |
||||
dto.Dashboard.SetUID(uid) |
||||
|
||||
_, err := service.SaveDashboard(context.Background(), dto, false) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func createDashboards(t *testing.T, service dashboards.DashboardService, number int, prefix string) { |
||||
for i := 0; i < number; i++ { |
||||
title := fmt.Sprintf("%s-%d", prefix, i) |
||||
uid := fmt.Sprintf("dash-%s", title) |
||||
createDashboard(t, service, uid, title) |
||||
} |
||||
} |
Loading…
Reference in new issue