diff --git a/pkg/server/distributor.go b/pkg/server/distributor.go index d9fdadc61ca..c33713731bd 100644 --- a/pkg/server/distributor.go +++ b/pkg/server/distributor.go @@ -4,43 +4,58 @@ import ( "context" "fmt" "hash/fnv" + "io" + "net" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" ringclient "github.com/grafana/dskit/ring/client" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/modules" - "github.com/grafana/grafana/pkg/services/grpcserver" - "github.com/grafana/grafana/pkg/services/grpcserver/interceptors" + + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/storage/unified/resource" userutils "github.com/grafana/dskit/user" - resourcegrpc "github.com/grafana/grafana/pkg/storage/unified/resource/grpc" - "github.com/grafana/grafana/pkg/storage/unified/sql" - "go.opentelemetry.io/otel" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" ) func (ms *ModuleServer) initDistributor() (services.Service, error) { - tracer := otel.Tracer("unified-storage-distributor") + // tracer := otel.Tracer("unified-storage-distributor") distributor := &Distributor{ + cfg: ms.cfg.GRPCServer, stoppedCh: make(chan error), + logger: log.New("distributor-grpc-server"), } // FIXME: This is a temporary solution while we are migrating to the new authn interceptor // grpcutils.NewGrpcAuthenticator should be used instead. - authn := sql.NewAuthenticatorWithFallback(ms.cfg, ms.registerer, tracer, func(ctx context.Context) (context.Context, error) { - auth := resourcegrpc.Authenticator{Tracer: tracer} - return auth.Authenticate(ctx) - }) + // authn := sql.NewAuthenticatorWithFallback(ms.cfg, ms.registerer, tracer, func(ctx context.Context) (context.Context, error) { + // auth := resourcegrpc.Authenticator{Tracer: tracer} + // return auth.Authenticate(ctx) + // }) - var err error - distributor.grpcHandler, err = grpcserver.ProvideService(ms.cfg, ms.features, interceptors.AuthenticatorFunc(authn), tracer, ms.registerer) - if err != nil { - return nil, err + distributorServer := &DistributorServer{ + ring: ms.storageRing, + clientPool: ms.storageRingClientPool, + } + + opts := []grpc.ServerOption{ + grpc.UnknownServiceHandler(distributorServer.handler), + // grpc.ChainUnaryInterceptor( + // grpcAuth.UnaryServerInterceptor(interceptors.AuthenticatorFunc(authn).Authenticate), + // ), } + distributor.grpcServer = grpc.NewServer(opts...) // grpcserver.ProvideService(ms.cfg, ms.features, interceptors.AuthenticatorFunc(authn), tracer, ms.registerer) healthServer := &healthServer{} healthService, err := resource.ProvideHealthService(healthServer) @@ -48,43 +63,55 @@ func (ms *ModuleServer) initDistributor() (services.Service, error) { return nil, err } - distributorServer := &DistributorServer{ - ring: ms.storageRing, - clientPool: ms.storageRingClientPool, - } - grpcServer := distributor.grpcHandler.GetServer() + grpcServer := distributor.grpcServer - resource.RegisterResourceStoreServer(grpcServer, distributorServer) + // resource.RegisterResourceStoreServer(grpcServer, distributorServer) // TODO how to do this // resource.RegisterBulkStoreServer(grpcServer, distributorServer) - resource.RegisterResourceIndexServer(grpcServer, distributorServer) - resource.RegisterManagedObjectIndexServer(grpcServer, distributorServer) - resource.RegisterBlobStoreServer(grpcServer, distributorServer) + // resource.RegisterResourceIndexServer(grpcServer, distributorServer) + // resource.RegisterManagedObjectIndexServer(grpcServer, distributorServer) + // resource.RegisterBlobStoreServer(grpcServer, distributorServer) grpc_health_v1.RegisterHealthServer(grpcServer, healthService) - _, err = grpcserver.ProvideReflectionService(ms.cfg, distributor.grpcHandler) - if err != nil { - return nil, err - } + // grpc_reflection_v1alpha.RegisterServerReflectionServer(distributor.grpcServer, reflection.NewServer(reflection.ServerOptions{Services: distributor.grpcServer})) return services.NewBasicService(distributor.start, distributor.running, nil).WithName(modules.Distributor), nil } type Distributor struct { - grpcHandler grpcserver.Provider - stoppedCh chan error + cfg setting.GRPCServerSettings + grpcServer *grpc.Server + stoppedCh chan error + logger log.Logger } func (d *Distributor) start(ctx context.Context) error { + // s.logger.Info("Running GRPC server", "address", s.cfg.Address, "network", s.cfg.Network, "tls", s.cfg.TLSConfig != nil, "max_recv_msg_size", s.cfg.MaxRecvMsgSize, "max_send_msg_size", s.cfg.MaxSendMsgSize) + d.logger.Info("Running Distributor GRPC server") + + listener, err := net.Listen(d.cfg.Network, d.cfg.Address) + if err != nil { + return fmt.Errorf("GRPC server: failed to listen: %w", err) + } + go func() { - err := d.grpcHandler.Run(ctx) + d.logger.Info("GRPC server: starting") + err := d.grpcServer.Serve(listener) if err != nil { + d.logger.Error("GRPC server: failed to serve", "err", err) d.stoppedCh <- err - } else { - d.stoppedCh <- nil } - close(d.stoppedCh) }() - return nil + + select { + case err := <-d.stoppedCh: + d.logger.Error("GRPC server: failed to serve", "err", err) + return err + case <-ctx.Done(): + } + d.logger.Warn("GRPC server: shutting down") + d.grpcServer.Stop() + // close channel? + return ctx.Err() } func (d *Distributor) running(ctx context.Context) error { @@ -107,131 +134,137 @@ var ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceS return s != ring.ACTIVE }) -func (ds *DistributorServer) Search(ctx context.Context, r *resource.ResourceSearchRequest) (*resource.ResourceSearchResponse, error) { - fmt.Println("distributing Search") - client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace) - if err != nil { - return nil, err - } - - return client.Search(userutils.InjectOrgID(ctx, "1"), r) -} - -func (ds *DistributorServer) GetStats(ctx context.Context, r *resource.ResourceStatsRequest) (*resource.ResourceStatsResponse, error) { - fmt.Println("distributing GetStats") - client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) - if err != nil { - return nil, err - } - - return client.GetStats(userutils.InjectOrgID(ctx, "1"), r) -} - -func (ds *DistributorServer) Read(ctx context.Context, r *resource.ReadRequest) (*resource.ReadResponse, error) { - fmt.Println("distributing Read") - client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) - if err != nil { - return nil, err +var ( + clientStreamDescForProxying = &grpc.StreamDesc{ + ServerStreams: true, + ClientStreams: true, } +) - return client.Read(userutils.InjectOrgID(ctx, "1"), r) -} - -func (ds *DistributorServer) Create(ctx context.Context, r *resource.CreateRequest) (*resource.CreateResponse, error) { - fmt.Println("distributing Create") - client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) - if err != nil { - return nil, err +func (ds *DistributorServer) handler(srv interface{}, serverStream grpc.ServerStream) error { + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if !ok { + return status.Errorf(codes.Internal, "missing method name") } - return client.Create(userutils.InjectOrgID(ctx, "1"), r) -} + namespace := ds.getNamespaceFromContext(serverStream.Context()) -func (ds *DistributorServer) Update(ctx context.Context, r *resource.UpdateRequest) (*resource.UpdateResponse, error) { - fmt.Println("distributing Update") - client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) + // TODO if namespace is not present or is *, assign random pod for now + conn, err := ds.getClientConnToDistributeRequest(serverStream.Context(), namespace) if err != nil { - return nil, err + return err } - return client.Update(userutils.InjectOrgID(ctx, "1"), r) -} + md, _ := metadata.FromIncomingContext(serverStream.Context()) + outCtx := metadata.NewOutgoingContext(serverStream.Context(), md.Copy()) + clientCtx, clientCancel := context.WithCancel(outCtx) + defer clientCancel() -func (ds *DistributorServer) Delete(ctx context.Context, r *resource.DeleteRequest) (*resource.DeleteResponse, error) { - fmt.Println("distributing Delete") - client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) + clientStream, err := conn.NewStream(userutils.InjectOrgID(clientCtx, "1"), clientStreamDescForProxying, fullMethodName) if err != nil { - return nil, err + return err } - return client.Delete(userutils.InjectOrgID(ctx, "1"), r) -} - -func (ds *DistributorServer) List(ctx context.Context, r *resource.ListRequest) (*resource.ListResponse, error) { - fmt.Println("distributing List") - client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace) - if err != nil { - return nil, err + s2cErrChan := ds.forwardServerToClient(serverStream, clientStream) + c2sErrChan := ds.forwardClientToServer(clientStream, serverStream) + // We don't know which side is going to stop sending first, so we need a select between the two. + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + clientStream.CloseSend() + } else { + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + clientCancel() + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + } + case c2sErr := <-c2sErrChan: + // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two + // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + // will be nil. + serverStream.SetTrailer(clientStream.Trailer()) + // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. + if c2sErr != io.EOF { + return c2sErr + } + return nil + } } - - return client.List(userutils.InjectOrgID(ctx, "1"), r) -} - -func (ds *DistributorServer) Watch(r *resource.WatchRequest, srv resource.ResourceStore_WatchServer) error { - fmt.Println("distributing Watch") - return nil - // ctx := srv.Context() - - // client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace) - // if err != nil { - // return err - // } - - // return client.Watch(r, srv) + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } -// TODO how to do this -// func (ds *DistributorServer) BulkProcess(r *resource.WatchRequest, srv resource.ResourceStore_WatchServer) error { -// return nil -// } - -func (ds *DistributorServer) CountManagedObjects(ctx context.Context, r *resource.CountManagedObjectsRequest) (*resource.CountManagedObjectsResponse, error) { - client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) - if err != nil { - return nil, err - } - - return client.CountManagedObjects(userutils.InjectOrgID(ctx, "1"), r) +func (ds *DistributorServer) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + go func() { + f := &emptypb.Empty{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if i == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret } -func (ds *DistributorServer) ListManagedObjects(ctx context.Context, r *resource.ListManagedObjectsRequest) (*resource.ListManagedObjectsResponse, error) { - client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) - if err != nil { - return nil, err - } - - return client.ListManagedObjects(userutils.InjectOrgID(ctx, "1"), r) +func (ds *DistributorServer) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { + ret := make(chan error, 1) + go func() { + f := &emptypb.Empty{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret } -func (ds *DistributorServer) PutBlob(ctx context.Context, r *resource.PutBlobRequest) (*resource.PutBlobResponse, error) { - client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace) - if err != nil { - return nil, err +func (ds *DistributorServer) getNamespaceFromContext(ctx context.Context) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "" } - return client.PutBlob(userutils.InjectOrgID(ctx, "1"), r) -} + if namespace := md.Get("namespace"); len(namespace) > 0 { + if namespace[0] == "*" { + return "" + } -func (ds *DistributorServer) GetBlob(ctx context.Context, r *resource.GetBlobRequest) (*resource.GetBlobResponse, error) { - client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace) - if err != nil { - return nil, err + return namespace[0] + } else { + return "" } - - return client.GetBlob(userutils.InjectOrgID(ctx, "1"), r) } -func (ds *DistributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (resource.ResourceClient, error) { +func (ds *DistributorServer) getClientConnToDistributeRequest(ctx context.Context, namespace string) (*grpc.ClientConn, error) { ringHasher := fnv.New32a() _, err := ringHasher.Write([]byte(namespace)) if err != nil { @@ -250,7 +283,7 @@ func (ds *DistributorServer) getClientToDistributeRequest(ctx context.Context, n fmt.Println("distributing request to ", rs.Instances[0].Id) - return client.(*resource.RingClient).Client, nil + return client.(*ringClient).conn, nil } type healthServer struct{} diff --git a/pkg/server/ring.go b/pkg/server/ring.go index dcb27541d1b..eb7bb5220bb 100644 --- a/pkg/server/ring.go +++ b/pkg/server/ring.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" @@ -123,15 +122,28 @@ func newClientPool(clientCfg grpcclient.Config, log log.Logger, reg prometheus.R return nil, fmt.Errorf("failed to dial resource server %s %s: %s", inst.Id, inst.Addr, err) } - // TODO only use this if FlagAppPlatformGrpcClientAuth is not enabled - client := resource.NewLegacyResourceClient(conn) - - return &resource.RingClient{ - Client: client, + return &ringClient{ HealthClient: grpc_health_v1.NewHealthClient(conn), - Conn: conn, + conn: conn, }, nil }) return ringclient.NewPool(ringName, poolCfg, nil, factory, clientsCount, log) } + +type ringClient struct { + grpc_health_v1.HealthClient + conn *grpc.ClientConn +} + +func (c *ringClient) Close() error { + return c.conn.Close() +} + +func (c *ringClient) String() string { + return c.RemoteAddress() +} + +func (c *ringClient) RemoteAddress() string { + return c.conn.Target() +} diff --git a/pkg/services/apiserver/client/client.go b/pkg/services/apiserver/client/client.go index d4d9ca9a255..f7267cd515b 100644 --- a/pkg/services/apiserver/client/client.go +++ b/pkg/services/apiserver/client/client.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "google.golang.org/grpc/metadata" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -63,6 +64,8 @@ func (h *k8sHandler) GetNamespace(orgID int64) string { } func (h *k8sHandler) Get(ctx context.Context, name string, orgID int64, options v1.GetOptions, subresource ...string) (*unstructured.Unstructured, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return nil, err @@ -72,6 +75,8 @@ func (h *k8sHandler) Get(ctx context.Context, name string, orgID int64, options } func (h *k8sHandler) Create(ctx context.Context, obj *unstructured.Unstructured, orgID int64, opts v1.CreateOptions) (*unstructured.Unstructured, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return nil, err @@ -81,6 +86,8 @@ func (h *k8sHandler) Create(ctx context.Context, obj *unstructured.Unstructured, } func (h *k8sHandler) Update(ctx context.Context, obj *unstructured.Unstructured, orgID int64, opts v1.UpdateOptions) (*unstructured.Unstructured, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return nil, err @@ -90,6 +97,8 @@ func (h *k8sHandler) Update(ctx context.Context, obj *unstructured.Unstructured, } func (h *k8sHandler) Delete(ctx context.Context, name string, orgID int64, options v1.DeleteOptions) error { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return err @@ -99,6 +108,8 @@ func (h *k8sHandler) Delete(ctx context.Context, name string, orgID int64, optio } func (h *k8sHandler) DeleteCollection(ctx context.Context, orgID int64) error { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return err @@ -108,6 +119,8 @@ func (h *k8sHandler) DeleteCollection(ctx context.Context, orgID int64) error { } func (h *k8sHandler) List(ctx context.Context, orgID int64, options v1.ListOptions) (*unstructured.UnstructuredList, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + client, err := h.getClient(ctx, orgID) if err != nil { return nil, err @@ -130,10 +143,14 @@ func (h *k8sHandler) Search(ctx context.Context, orgID int64, in *resource.Resou } } + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", in.Options.Key.Namespace) + return h.searcher.Search(ctx, in) } func (h *k8sHandler) GetStats(ctx context.Context, orgID int64) (*resource.ResourceStatsResponse, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "namespace", h.GetNamespace(orgID)) + // goes directly through grpc, so doesn't need the new context return h.searcher.GetStats(ctx, &resource.ResourceStatsRequest{ Namespace: h.GetNamespace(orgID), diff --git a/pkg/storage/unified/resource/grpc/authenticator.go b/pkg/storage/unified/resource/grpc/authenticator.go index 577e752ac2d..31e86b26b83 100644 --- a/pkg/storage/unified/resource/grpc/authenticator.go +++ b/pkg/storage/unified/resource/grpc/authenticator.go @@ -138,14 +138,22 @@ func wrapContext(ctx context.Context) (context.Context, error) { return ctx, err } + var namespace string + if md, ok := metadata.FromOutgoingContext(ctx); ok { + // TODO use const instead of "namespace" + if ns := md.Get("namespace"); len(ns) > 0 { + namespace = ns[0] + } + } + // set grpc metadata into the context to pass to the grpc server - return metadata.NewOutgoingContext(ctx, encodeIdentityInMetadata(user)), nil + return metadata.NewOutgoingContext(ctx, encodeIdentityInMetadata(user, namespace)), nil } -func encodeIdentityInMetadata(user identity.Requester) metadata.MD { +func encodeIdentityInMetadata(user identity.Requester, namespace string) metadata.MD { id, _ := user.GetInternalID() - logger.Debug("encodeIdentityInMetadata", "user.id", user.GetID(), "user.Login", user.GetLogin(), "user.Name", user.GetName()) + logger.Info("encodeIdentityInMetadata", "user.id", user.GetID(), "user.Login", user.GetLogin(), "user.Name", user.GetName(), "namespace", user.GetNamespace()) return metadata.Pairs( // This should be everything needed to recreate the user @@ -158,6 +166,9 @@ func encodeIdentityInMetadata(user identity.Requester) metadata.MD { mdOrgRole, string(user.GetOrgRole()), mdLogin, user.GetLogin(), + // TODO check if there's a better way + "namespace", namespace, + // TODO, Remove after this is deployed to unified storage "grafana-userid", strconv.FormatInt(id, 10), "grafana-useruid", user.GetRawIdentifier(), diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 4ecc39f1740..ce91a32b707 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -13,8 +13,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -305,24 +303,6 @@ type server struct { initErr error } -type RingClient struct { - Client ResourceClient - grpc_health_v1.HealthClient - Conn *grpc.ClientConn -} - -func (c *RingClient) Close() error { - return c.Conn.Close() -} - -func (c *RingClient) String() string { - return c.RemoteAddress() -} - -func (c *RingClient) RemoteAddress() string { - return c.Conn.Target() -} - // Init implements ResourceServer. func (s *server) Init(ctx context.Context) error { s.once.Do(func() {