Zanzana: Remove model and store initiation from client (#95328)

* Remove model and store initiation from client
pull/95364/head
Karl Persson 9 months ago committed by GitHub
parent f5ed2f52f7
commit beaac3c885
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/services/authz/zanzana/client.go
  2. 90
      pkg/services/authz/zanzana/client/client.go
  3. 117
      pkg/services/authz/zanzana/client/client_test.go
  4. 2
      pkg/services/authz/zanzana/server.go
  5. 2
      pkg/services/authz/zanzana/server/authz_server.go
  6. 11
      pkg/services/dashboards/service/zanzana_integration_test.go

@ -22,10 +22,15 @@ type Client interface {
}
func NewClient(ctx context.Context, cc grpc.ClientConnInterface, cfg *setting.Cfg) (*client.Client, error) {
stackID := cfg.StackID
if stackID == "" {
stackID = "default"
}
return client.New(
ctx,
cc,
client.WithTenantID(fmt.Sprintf("stack-%s", cfg.StackID)),
client.WithTenantID(fmt.Sprintf("stacks-%s", stackID)),
client.WithLogger(log.New("zanzana-client")),
)
}

@ -6,14 +6,12 @@ import (
"fmt"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/openfga/language/pkg/go/transformer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/authz/zanzana/schema"
)
var tracer = otel.Tracer("github.com/grafana/grafana/pkg/services/authz/zanzana/client")
@ -32,16 +30,9 @@ func WithLogger(logger log.Logger) ClientOption {
}
}
func WithSchema(modules []transformer.ModuleFile) ClientOption {
return func(c *Client) {
c.modules = modules
}
}
type Client struct {
logger log.Logger
client openfgav1.OpenFGAServiceClient
modules []transformer.ModuleFile
tenantID string
storeID string
modelID string
@ -61,21 +52,17 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, opts ...ClientOption)
}
if c.tenantID == "" {
c.tenantID = "stack-default"
c.tenantID = "stacks-default"
}
if len(c.modules) == 0 {
c.modules = schema.SchemaModules
}
store, err := c.getOrCreateStore(ctx, c.tenantID)
store, err := c.getStore(ctx, c.tenantID)
if err != nil {
return nil, err
}
c.storeID = store.GetId()
modelID, err := c.loadModel(ctx, c.storeID, c.modules)
modelID, err := c.loadModel(ctx, c.storeID)
if err != nil {
return nil, err
}
@ -119,24 +106,6 @@ func (c *Client) Write(ctx context.Context, in *openfgav1.WriteRequest) error {
return err
}
func (c *Client) getOrCreateStore(ctx context.Context, name string) (*openfgav1.Store, error) {
store, err := c.getStore(ctx, name)
if errors.Is(err, errStoreNotFound) {
var res *openfgav1.CreateStoreResponse
res, err = c.client.CreateStore(ctx, &openfgav1.CreateStoreRequest{Name: name})
if res != nil {
store = &openfgav1.Store{
Id: res.GetId(),
Name: res.GetName(),
CreatedAt: res.GetCreatedAt(),
}
}
}
return store, err
}
var errStoreNotFound = errors.New("store not found")
func (c *Client) getStore(ctx context.Context, name string) (*openfgav1.Store, error) {
@ -170,52 +139,21 @@ func (c *Client) getStore(ctx context.Context, name string) (*openfgav1.Store, e
}
}
func (c *Client) loadModel(ctx context.Context, storeID string, modules []transformer.ModuleFile) (string, error) {
var continuationToken string
func (c *Client) loadModel(ctx context.Context, storeID string) (string, error) {
// ReadAuthorizationModels returns authorization models for a store sorted in descending order of creation.
// So with a pageSize of 1 we will get the latest model.
res, err := c.client.ReadAuthorizationModels(ctx, &openfgav1.ReadAuthorizationModelsRequest{
StoreId: storeID,
PageSize: &wrapperspb.Int32Value{Value: 1},
})
model, err := schema.TransformModulesToModel(modules)
if err != nil {
return "", err
}
for {
// ReadAuthorizationModels returns authorization models for a store sorted in descending order of creation.
// So with a pageSize of 1 we will get the latest model.
res, err := c.client.ReadAuthorizationModels(ctx, &openfgav1.ReadAuthorizationModelsRequest{
StoreId: storeID,
PageSize: &wrapperspb.Int32Value{Value: 20},
ContinuationToken: continuationToken,
})
if err != nil {
return "", fmt.Errorf("failed to load authorization model: %w", err)
}
for _, m := range res.GetAuthorizationModels() {
// If provided dsl is equal to a stored dsl we use that as the authorization id
if schema.EqualModels(m, model) {
return m.GetId(), nil
}
}
// If we have not found any matching authorization model we break the loop and create a new one
if res.GetContinuationToken() == "" {
break
}
continuationToken = res.GetContinuationToken()
return "", fmt.Errorf("failed to load latest authorization model: %w", err)
}
writeRes, err := c.client.WriteAuthorizationModel(ctx, &openfgav1.WriteAuthorizationModelRequest{
StoreId: c.storeID,
TypeDefinitions: model.GetTypeDefinitions(),
SchemaVersion: model.GetSchemaVersion(),
Conditions: model.GetConditions(),
})
if err != nil {
return "", fmt.Errorf("failed to load authorization model: %w", err)
if len(res.AuthorizationModels) != 1 {
return "", fmt.Errorf("failed to load latest authorization model")
}
return writeRes.GetAuthorizationModelId(), nil
return res.AuthorizationModels[0].GetId(), nil
}

@ -1,117 +0,0 @@
package client
import (
"context"
"testing"
openfgav1 "github.com/openfga/api/proto/openfga/v1"
"github.com/openfga/language/pkg/go/transformer"
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tests/testsuite"
zserver "github.com/grafana/grafana/pkg/services/authz/zanzana/server"
zstore "github.com/grafana/grafana/pkg/services/authz/zanzana/store"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func TestMain(m *testing.M) {
testsuite.Run(m)
}
func TestIntegrationClient(t *testing.T) {
conn := zanzanaServerIntegrationTest(t)
var (
prevStoreID string
prevModelID string
)
t.Run("should create default store and authorization model on first startup", func(t *testing.T) {
c, err := New(context.Background(), conn)
require.NoError(t, err)
assert.NotEmpty(t, c.storeID)
assert.NotEmpty(t, c.modelID)
prevStoreID, prevModelID = c.storeID, c.modelID
})
t.Run("should reuse existing store and authorization model", func(t *testing.T) {
c, err := New(context.Background(), conn)
require.NoError(t, err)
assert.Equal(t, prevStoreID, c.storeID)
assert.Equal(t, prevModelID, c.modelID)
})
t.Run("should create new store and authorization model when new tenant id is used", func(t *testing.T) {
c, err := New(context.Background(), conn, WithTenantID("new"))
require.NoError(t, err)
assert.NotEmpty(t, c.storeID)
assert.NotEmpty(t, c.modelID)
assert.NotEqual(t, prevStoreID, c.storeID)
assert.NotEqual(t, prevModelID, c.modelID)
prevStoreID, prevModelID = c.storeID, c.modelID
})
t.Run("should update authorization model if it has new changes", func(t *testing.T) {
dsl := `
module core
type user
`
modules := []transformer.ModuleFile{
{Name: "core.fga", Contents: dsl},
}
c, err := New(context.Background(), conn, WithTenantID("new"), WithSchema(modules))
require.NoError(t, err)
assert.Equal(t, prevStoreID, c.storeID)
assert.NotEqual(t, prevModelID, c.modelID)
})
t.Run("should load older authorization model", func(t *testing.T) {
c, err := New(context.Background(), conn, WithTenantID("new"))
require.NoError(t, err)
assert.Equal(t, prevStoreID, c.storeID)
assert.Equal(t, prevModelID, c.modelID)
})
}
func zanzanaServerIntegrationTest(tb testing.TB) *inprocgrpc.Channel {
if testing.Short() {
tb.Skip("skipping integration test")
}
db, cfg := db.InitTestDBWithCfg(tb)
// Hack to skip these tests on mysql 5.7
if db.GetDialect().DriverName() == migrator.MySQL {
if supported, err := db.RecursiveQueriesAreSupported(); !supported || err != nil {
tb.Skip("skipping integration test")
}
}
logger := log.NewNopLogger()
store, err := zstore.NewEmbeddedStore(cfg, db, logger)
require.NoError(tb, err)
srv, err := zserver.NewOpenFGA(&cfg.Zanzana, store, logger)
require.NoError(tb, err)
channel := &inprocgrpc.Channel{}
openfgav1.RegisterOpenFGAServiceServer(channel, srv)
return channel
}

@ -26,7 +26,7 @@ func NewAuthzServer(cfg *setting.Cfg, openfga openfgav1.OpenFGAServiceServer) (*
return zserver.NewAuthz(
openfga,
zserver.WithTenantID(fmt.Sprintf("stack-%s", stackID)),
zserver.WithTenantID(fmt.Sprintf("stacks-%s", stackID)),
)
}

@ -68,7 +68,7 @@ func NewAuthz(openfga openfgav1.OpenFGAServiceServer, opts ...ServerOption) (*Se
}
if s.tenantID == "" {
s.tenantID = "stack-default"
s.tenantID = "stacks-default"
}
if len(s.modules) == 0 {

@ -20,6 +20,7 @@ import (
"github.com/grafana/grafana/pkg/services/folder/foldertest"
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/services/quota/quotatest"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/tag/tagimpl"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
@ -31,12 +32,16 @@ func TestIntegrationDashboardServiceZanzana(t *testing.T) {
}
t.Run("Zanzana enabled", func(t *testing.T) {
// t.Helper()
features := featuremgmt.WithFeatures(featuremgmt.FlagZanzana)
db, cfg := db.InitTestDBWithCfg(t)
// Hack to skip these tests on mysql 5.7
if db.GetDialect().DriverName() == migrator.MySQL {
if supported, err := db.RecursiveQueriesAreSupported(); !supported || err != nil {
t.Skip("skipping integration test")
}
}
// Enable zanzana and run in embedded mode (part of grafana server)
cfg.Zanzana.ZanzanaOnlyEvaluation = true
cfg.Zanzana.Mode = setting.ZanzanaModeEmbedded

Loading…
Cancel
Save