unified-storage: split resource index client (#106297)

* split resource server and index server grpc connection if defined in config
kvstore-6^2
Will Assis 3 weeks ago committed by GitHub
parent 0982cfd9a0
commit f4ee58db50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 15
      pkg/services/apiserver/options/storage.go
  2. 66
      pkg/storage/unified/client.go
  3. 151
      pkg/storage/unified/client_test.go
  4. 41
      pkg/storage/unified/resource/client.go
  5. 2
      pkg/storage/unified/sql/test/integration_test.go

@ -43,6 +43,7 @@ type StorageOptions struct {
// For unified-grpc
Address string
IndexServerAddress string
GrpcClientAuthenticationToken string
GrpcClientAuthenticationTokenExchangeURL string
GrpcClientAuthenticationTokenNamespace string
@ -136,10 +137,22 @@ func (o *StorageOptions) ApplyTo(serverConfig *genericapiserver.RecommendedConfi
if err != nil {
return err
}
var indexConn *grpc.ClientConn
if o.IndexServerAddress != "" {
indexConn, err = grpc.NewClient(o.IndexServerAddress,
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return err
}
} else {
indexConn = conn
}
const resourceStoreAudience = "resourceStore"
unified, err := resource.NewRemoteResourceClient(tracer, conn, resource.RemoteResourceClientConfig{
unified, err := resource.NewRemoteResourceClient(tracer, conn, indexConn, resource.RemoteResourceClientConfig{
Token: o.GrpcClientAuthenticationToken,
TokenExchangeURL: o.GrpcClientAuthenticationTokenExchangeURL,
Namespace: o.GrpcClientAuthenticationTokenNamespace,

@ -55,7 +55,8 @@ func ProvideUnifiedStorageClient(opts *Options, storageMetrics *resource.Storage
client, err := newClient(options.StorageOptions{
StorageType: options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeUnified))),
DataPath: apiserverCfg.Key("storage_path").MustString(filepath.Join(opts.Cfg.DataPath, "grafana-apiserver")),
Address: apiserverCfg.Key("address").MustString(""), // client address
Address: apiserverCfg.Key("address").MustString(""),
IndexServerAddress: apiserverCfg.Key("index_server_address").MustString(""),
BlobStoreURL: apiserverCfg.Key("blob_url").MustString(""),
BlobThresholdBytes: apiserverCfg.Key("blob_threshold_bytes").MustInt(options.BlobThresholdDefault),
}, opts.Cfg, opts.Features, opts.DB, opts.Tracer, opts.Reg, opts.Authzc, opts.Docs, storageMetrics, indexMetrics)
@ -117,34 +118,29 @@ func newClient(opts options.StorageOptions,
}
var (
conn grpc.ClientConnInterface
err error
metrics = newClientMetrics(reg)
conn grpc.ClientConnInterface
indexConn grpc.ClientConnInterface
err error
metrics = newClientMetrics(reg)
)
// Create either a connection pool or a single connection.
// The connection pool __can__ be useful when connection to
// server side load balancers like kube-proxy.
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageGrpcConnectionPool) {
conn, err = newPooledConn(&poolOpts{
initialCapacity: 3,
maxCapacity: 6,
idleTimeout: time.Minute,
factory: func() (*grpc.ClientConn, error) {
return grpcConn(opts.Address, metrics)
},
})
conn, err = newGrpcConn(opts.Address, metrics, features)
if err != nil {
return nil, err
}
if opts.IndexServerAddress != "" {
indexConn, err = newGrpcConn(opts.IndexServerAddress, metrics, features)
if err != nil {
return nil, err
}
} else {
conn, err = grpcConn(opts.Address, metrics)
if err != nil {
return nil, err
}
indexConn = conn
}
// Create a client instance
client, err := resource.NewResourceClient(conn, cfg, features, tracer)
client, err := resource.NewResourceClient(conn, indexConn, cfg, features, tracer)
if err != nil {
return nil, err
}
@ -164,6 +160,34 @@ func newClient(opts options.StorageOptions,
}
}
func newGrpcConn(address string, metrics *clientMetrics, features featuremgmt.FeatureToggles) (grpc.ClientConnInterface, error) {
// Create either a connection pool or a single connection.
// The connection pool __can__ be useful when connection to
// server side load balancers like kube-proxy.
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageGrpcConnectionPool) {
conn, err := newPooledConn(&poolOpts{
initialCapacity: 3,
maxCapacity: 6,
idleTimeout: time.Minute,
factory: func() (*grpc.ClientConn, error) {
return grpcConn(address, metrics)
},
})
if err != nil {
return nil, err
}
return conn, nil
}
conn, err := grpcConn(address, metrics)
if err != nil {
return nil, err
}
return conn, nil
}
// grpcConn creates a new gRPC connection to the provided address.
func grpcConn(address string, metrics *clientMetrics) (*grpc.ClientConn, error) {
// Report gRPC status code errors as labels.

@ -0,0 +1,151 @@
package unified
import (
"context"
"net"
"strings"
"testing"
authlib "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
func TestUnifiedStorageClient(t *testing.T) {
resourceServerAddress := ":11000"
resourceServer := createTestGrpcServer(t, resourceServerAddress)
defer resourceServer.s.Stop()
indexServerAddress := ":11001"
indexServer := createTestGrpcServer(t, indexServerAddress)
defer indexServer.s.Stop()
t.Run("when storage type is unified-grpc", func(t *testing.T) {
t.Run("should create a client that connects to the unified storage server address", func(t *testing.T) {
resourceServer.resetCalls()
indexServer.resetCalls()
client, err := newClient(
options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress,
},
&setting.Cfg{},
featuremgmt.WithFeatures(),
nil,
nil,
nil,
authlib.FixedAccessClient(true),
nil,
nil,
nil,
)
require.NoError(t, err)
testCallAllMethods(client)
// every method should hit resource server exactly once
for method, count := range resourceServer.Calls {
require.Equal(t, 1, count, "method was called more than once: "+method)
}
// no hits to the index server in this case
for range indexServer.Calls {
require.FailNow(t, "index server was called when it should have not")
}
})
t.Run("should connect to a separate index server if defined in the config", func(t *testing.T) {
resourceServer.resetCalls()
indexServer.resetCalls()
client, err := newClient(
options.StorageOptions{
StorageType: options.StorageTypeUnifiedGrpc,
Address: resourceServerAddress,
IndexServerAddress: indexServerAddress,
},
&setting.Cfg{},
featuremgmt.WithFeatures(),
nil,
nil,
nil,
authlib.FixedAccessClient(true),
nil,
nil,
nil,
)
require.NoError(t, err)
testCallAllMethods(client)
// only resource store methods in this case
for method, count := range resourceServer.Calls {
require.Equal(t, 1, count, "method was called more than once: "+method)
require.True(t, strings.Contains(method, "resource.ResourceStore"))
}
// index server methods should be called here
for method, count := range indexServer.Calls {
require.Equal(t, 1, count, "method was called more than once: "+method)
require.True(t, strings.Contains(method, "resource.ResourceIndex") || strings.Contains(method, "resource.ManagedObjectIndex"))
}
})
})
}
func testCallAllMethods(client resource.ResourceClient) {
_, _ = client.Read(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ReadRequest{})
_, _ = client.Create(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.CreateRequest{})
_, _ = client.Delete(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.DeleteRequest{})
_, _ = client.Update(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.UpdateRequest{})
_, _ = client.List(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ListRequest{})
_, _ = client.GetStats(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ResourceStatsRequest{})
_, _ = client.Search(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ResourceSearchRequest{})
_, _ = client.CountManagedObjects(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.CountManagedObjectsRequest{})
_, _ = client.ListManagedObjects(identity.WithServiceIdentityContext(context.Background(), 1), &resourcepb.ListManagedObjectsRequest{})
}
func createTestGrpcServer(t *testing.T, address string) *testServer {
listener, err := net.Listen("tcp", address)
require.NoError(t, err, "failed to listen")
testServer := newTestServer()
s := grpc.NewServer(
grpc.UnknownServiceHandler(testServer.handler),
)
go func() {
_ = s.Serve(listener)
}()
testServer.s = s
return testServer
}
type testServer struct {
resource.ResourceServer
Calls map[string]int
s *grpc.Server
}
func newTestServer() *testServer {
return &testServer{
Calls: make(map[string]int),
}
}
func (s *testServer) resetCalls() {
s.Calls = make(map[string]int)
}
func (s *testServer) handler(srv interface{}, serverStream grpc.ServerStream) error {
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if ok {
s.Calls[fullMethodName]++
}
return nil
}

@ -48,14 +48,14 @@ type resourceClient struct {
resourcepb.DiagnosticsClient
}
func NewResourceClient(conn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) {
func NewResourceClient(conn, indexConn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagAppPlatformGrpcClientAuth) {
return NewLegacyResourceClient(conn), nil
return NewLegacyResourceClient(conn, indexConn), nil
}
clientCfg := authnGrpcUtils.ReadGrpcClientConfig(cfg)
return NewRemoteResourceClient(tracer, conn, RemoteResourceClientConfig{
return NewRemoteResourceClient(tracer, conn, indexConn, RemoteResourceClientConfig{
Token: clientCfg.Token,
TokenExchangeURL: clientCfg.TokenExchangeURL,
Audiences: []string{"resourceStore"},
@ -64,24 +64,25 @@ func NewResourceClient(conn grpc.ClientConnInterface, cfg *setting.Cfg, features
})
}
func newResourceClient(cc grpc.ClientConnInterface) ResourceClient {
func newResourceClient(storageCc grpc.ClientConnInterface, indexCc grpc.ClientConnInterface) ResourceClient {
return &resourceClient{
ResourceStoreClient: resourcepb.NewResourceStoreClient(cc),
ResourceIndexClient: resourcepb.NewResourceIndexClient(cc),
ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(cc),
BulkStoreClient: resourcepb.NewBulkStoreClient(cc),
BlobStoreClient: resourcepb.NewBlobStoreClient(cc),
DiagnosticsClient: resourcepb.NewDiagnosticsClient(cc),
ResourceStoreClient: resourcepb.NewResourceStoreClient(storageCc),
ResourceIndexClient: resourcepb.NewResourceIndexClient(indexCc),
ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(indexCc),
BulkStoreClient: resourcepb.NewBulkStoreClient(storageCc),
BlobStoreClient: resourcepb.NewBlobStoreClient(storageCc),
DiagnosticsClient: resourcepb.NewDiagnosticsClient(storageCc),
}
}
func NewAuthlessResourceClient(cc grpc.ClientConnInterface) ResourceClient {
return newResourceClient(cc)
return newResourceClient(cc, cc)
}
func NewLegacyResourceClient(channel grpc.ClientConnInterface) ResourceClient {
func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc.ClientConnInterface) ResourceClient {
cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return newResourceClient(cc)
cci := grpchan.InterceptClientConn(indexChannel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return newResourceClient(cc, cci)
}
func NewLocalResourceClient(server ResourceServer) ResourceClient {
@ -114,7 +115,7 @@ func NewLocalResourceClient(server ResourceServer) ResourceClient {
)
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc)
return newResourceClient(cc, cc)
}
type RemoteResourceClientConfig struct {
@ -125,7 +126,7 @@ type RemoteResourceClientConfig struct {
AllowInsecure bool
}
func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) {
func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, indexConn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) {
exchangeOpts := []authnlib.ExchangeClientOpts{}
if cfg.AllowInsecure {
@ -149,14 +150,8 @@ func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface,
)
cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return &resourceClient{
ResourceStoreClient: resourcepb.NewResourceStoreClient(cc),
ResourceIndexClient: resourcepb.NewResourceIndexClient(cc),
BlobStoreClient: resourcepb.NewBlobStoreClient(cc),
BulkStoreClient: resourcepb.NewBulkStoreClient(cc),
ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(cc),
DiagnosticsClient: resourcepb.NewDiagnosticsClient(cc),
}, nil
cci := grpchan.InterceptClientConn(indexConn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cci), nil
}
var authLogger = slog.Default().With("logger", "resource-client-auth-interceptor")

@ -165,7 +165,7 @@ func TestClientServer(t *testing.T) {
t.Run("Create a client", func(t *testing.T) {
conn, err := unified.GrpcConn(svc.GetAddress(), prometheus.NewPedanticRegistry())
require.NoError(t, err)
client, err = resource.NewRemoteResourceClient(tracing.NewNoopTracerService(), conn, resource.RemoteResourceClientConfig{
client, err = resource.NewRemoteResourceClient(tracing.NewNoopTracerService(), conn, conn, resource.RemoteResourceClientConfig{
Token: "some-token",
TokenExchangeURL: "http://some-change-url",
AllowInsecure: true,

Loading…
Cancel
Save