|
|
|
@ -4,43 +4,58 @@ import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"hash/fnv" |
|
|
|
|
"io" |
|
|
|
|
"net" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/dskit/ring" |
|
|
|
|
"github.com/grafana/dskit/services" |
|
|
|
|
|
|
|
|
|
ringclient "github.com/grafana/dskit/ring/client" |
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log" |
|
|
|
|
"github.com/grafana/grafana/pkg/modules" |
|
|
|
|
"github.com/grafana/grafana/pkg/services/grpcserver" |
|
|
|
|
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/setting" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/resource" |
|
|
|
|
|
|
|
|
|
userutils "github.com/grafana/dskit/user" |
|
|
|
|
resourcegrpc "github.com/grafana/grafana/pkg/storage/unified/resource/grpc" |
|
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/sql" |
|
|
|
|
|
|
|
|
|
"go.opentelemetry.io/otel" |
|
|
|
|
"google.golang.org/grpc" |
|
|
|
|
"google.golang.org/grpc/codes" |
|
|
|
|
"google.golang.org/grpc/health/grpc_health_v1" |
|
|
|
|
"google.golang.org/grpc/metadata" |
|
|
|
|
"google.golang.org/grpc/status" |
|
|
|
|
"google.golang.org/protobuf/types/known/emptypb" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (ms *ModuleServer) initDistributor() (services.Service, error) { |
|
|
|
|
tracer := otel.Tracer("unified-storage-distributor") |
|
|
|
|
// tracer := otel.Tracer("unified-storage-distributor")
|
|
|
|
|
|
|
|
|
|
distributor := &Distributor{ |
|
|
|
|
cfg: ms.cfg.GRPCServer, |
|
|
|
|
stoppedCh: make(chan error), |
|
|
|
|
logger: log.New("distributor-grpc-server"), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
|
|
|
|
|
// grpcutils.NewGrpcAuthenticator should be used instead.
|
|
|
|
|
authn := sql.NewAuthenticatorWithFallback(ms.cfg, ms.registerer, tracer, func(ctx context.Context) (context.Context, error) { |
|
|
|
|
auth := resourcegrpc.Authenticator{Tracer: tracer} |
|
|
|
|
return auth.Authenticate(ctx) |
|
|
|
|
}) |
|
|
|
|
// authn := sql.NewAuthenticatorWithFallback(ms.cfg, ms.registerer, tracer, func(ctx context.Context) (context.Context, error) {
|
|
|
|
|
// auth := resourcegrpc.Authenticator{Tracer: tracer}
|
|
|
|
|
// return auth.Authenticate(ctx)
|
|
|
|
|
// })
|
|
|
|
|
|
|
|
|
|
var err error |
|
|
|
|
distributor.grpcHandler, err = grpcserver.ProvideService(ms.cfg, ms.features, interceptors.AuthenticatorFunc(authn), tracer, ms.registerer) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
distributorServer := &DistributorServer{ |
|
|
|
|
ring: ms.storageRing, |
|
|
|
|
clientPool: ms.storageRingClientPool, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
opts := []grpc.ServerOption{ |
|
|
|
|
grpc.UnknownServiceHandler(distributorServer.handler), |
|
|
|
|
// grpc.ChainUnaryInterceptor(
|
|
|
|
|
// grpcAuth.UnaryServerInterceptor(interceptors.AuthenticatorFunc(authn).Authenticate),
|
|
|
|
|
// ),
|
|
|
|
|
} |
|
|
|
|
distributor.grpcServer = grpc.NewServer(opts...) // grpcserver.ProvideService(ms.cfg, ms.features, interceptors.AuthenticatorFunc(authn), tracer, ms.registerer)
|
|
|
|
|
|
|
|
|
|
healthServer := &healthServer{} |
|
|
|
|
healthService, err := resource.ProvideHealthService(healthServer) |
|
|
|
@ -48,43 +63,55 @@ func (ms *ModuleServer) initDistributor() (services.Service, error) { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
distributorServer := &DistributorServer{ |
|
|
|
|
ring: ms.storageRing, |
|
|
|
|
clientPool: ms.storageRingClientPool, |
|
|
|
|
} |
|
|
|
|
grpcServer := distributor.grpcHandler.GetServer() |
|
|
|
|
grpcServer := distributor.grpcServer |
|
|
|
|
|
|
|
|
|
resource.RegisterResourceStoreServer(grpcServer, distributorServer) |
|
|
|
|
// resource.RegisterResourceStoreServer(grpcServer, distributorServer)
|
|
|
|
|
// TODO how to do this
|
|
|
|
|
// resource.RegisterBulkStoreServer(grpcServer, distributorServer)
|
|
|
|
|
resource.RegisterResourceIndexServer(grpcServer, distributorServer) |
|
|
|
|
resource.RegisterManagedObjectIndexServer(grpcServer, distributorServer) |
|
|
|
|
resource.RegisterBlobStoreServer(grpcServer, distributorServer) |
|
|
|
|
// resource.RegisterResourceIndexServer(grpcServer, distributorServer)
|
|
|
|
|
// resource.RegisterManagedObjectIndexServer(grpcServer, distributorServer)
|
|
|
|
|
// resource.RegisterBlobStoreServer(grpcServer, distributorServer)
|
|
|
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthService) |
|
|
|
|
_, err = grpcserver.ProvideReflectionService(ms.cfg, distributor.grpcHandler) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// grpc_reflection_v1alpha.RegisterServerReflectionServer(distributor.grpcServer, reflection.NewServer(reflection.ServerOptions{Services: distributor.grpcServer}))
|
|
|
|
|
|
|
|
|
|
return services.NewBasicService(distributor.start, distributor.running, nil).WithName(modules.Distributor), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type Distributor struct { |
|
|
|
|
grpcHandler grpcserver.Provider |
|
|
|
|
stoppedCh chan error |
|
|
|
|
cfg setting.GRPCServerSettings |
|
|
|
|
grpcServer *grpc.Server |
|
|
|
|
stoppedCh chan error |
|
|
|
|
logger log.Logger |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *Distributor) start(ctx context.Context) error { |
|
|
|
|
// s.logger.Info("Running GRPC server", "address", s.cfg.Address, "network", s.cfg.Network, "tls", s.cfg.TLSConfig != nil, "max_recv_msg_size", s.cfg.MaxRecvMsgSize, "max_send_msg_size", s.cfg.MaxSendMsgSize)
|
|
|
|
|
d.logger.Info("Running Distributor GRPC server") |
|
|
|
|
|
|
|
|
|
listener, err := net.Listen(d.cfg.Network, d.cfg.Address) |
|
|
|
|
if err != nil { |
|
|
|
|
return fmt.Errorf("GRPC server: failed to listen: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
err := d.grpcHandler.Run(ctx) |
|
|
|
|
d.logger.Info("GRPC server: starting") |
|
|
|
|
err := d.grpcServer.Serve(listener) |
|
|
|
|
if err != nil { |
|
|
|
|
d.logger.Error("GRPC server: failed to serve", "err", err) |
|
|
|
|
d.stoppedCh <- err |
|
|
|
|
} else { |
|
|
|
|
d.stoppedCh <- nil |
|
|
|
|
} |
|
|
|
|
close(d.stoppedCh) |
|
|
|
|
}() |
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case err := <-d.stoppedCh: |
|
|
|
|
d.logger.Error("GRPC server: failed to serve", "err", err) |
|
|
|
|
return err |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
} |
|
|
|
|
d.logger.Warn("GRPC server: shutting down") |
|
|
|
|
d.grpcServer.Stop() |
|
|
|
|
// close channel?
|
|
|
|
|
return ctx.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (d *Distributor) running(ctx context.Context) error { |
|
|
|
@ -107,131 +134,137 @@ var ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceS |
|
|
|
|
return s != ring.ACTIVE |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Search(ctx context.Context, r *resource.ResourceSearchRequest) (*resource.ResourceSearchResponse, error) { |
|
|
|
|
fmt.Println("distributing Search") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Search(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) GetStats(ctx context.Context, r *resource.ResourceStatsRequest) (*resource.ResourceStatsResponse, error) { |
|
|
|
|
fmt.Println("distributing GetStats") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.GetStats(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Read(ctx context.Context, r *resource.ReadRequest) (*resource.ReadResponse, error) { |
|
|
|
|
fmt.Println("distributing Read") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
var ( |
|
|
|
|
clientStreamDescForProxying = &grpc.StreamDesc{ |
|
|
|
|
ServerStreams: true, |
|
|
|
|
ClientStreams: true, |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return client.Read(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Create(ctx context.Context, r *resource.CreateRequest) (*resource.CreateResponse, error) { |
|
|
|
|
fmt.Println("distributing Create") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
func (ds *DistributorServer) handler(srv interface{}, serverStream grpc.ServerStream) error { |
|
|
|
|
fullMethodName, ok := grpc.MethodFromServerStream(serverStream) |
|
|
|
|
if !ok { |
|
|
|
|
return status.Errorf(codes.Internal, "missing method name") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Create(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
namespace := ds.getNamespaceFromContext(serverStream.Context()) |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Update(ctx context.Context, r *resource.UpdateRequest) (*resource.UpdateResponse, error) { |
|
|
|
|
fmt.Println("distributing Update") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) |
|
|
|
|
// TODO if namespace is not present or is *, assign random pod for now
|
|
|
|
|
conn, err := ds.getClientConnToDistributeRequest(serverStream.Context(), namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Update(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
md, _ := metadata.FromIncomingContext(serverStream.Context()) |
|
|
|
|
outCtx := metadata.NewOutgoingContext(serverStream.Context(), md.Copy()) |
|
|
|
|
clientCtx, clientCancel := context.WithCancel(outCtx) |
|
|
|
|
defer clientCancel() |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Delete(ctx context.Context, r *resource.DeleteRequest) (*resource.DeleteResponse, error) { |
|
|
|
|
fmt.Println("distributing Delete") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace) |
|
|
|
|
clientStream, err := conn.NewStream(userutils.InjectOrgID(clientCtx, "1"), clientStreamDescForProxying, fullMethodName) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Delete(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) List(ctx context.Context, r *resource.ListRequest) (*resource.ListResponse, error) { |
|
|
|
|
fmt.Println("distributing List") |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
s2cErrChan := ds.forwardServerToClient(serverStream, clientStream) |
|
|
|
|
c2sErrChan := ds.forwardClientToServer(clientStream, serverStream) |
|
|
|
|
// We don't know which side is going to stop sending first, so we need a select between the two.
|
|
|
|
|
for i := 0; i < 2; i++ { |
|
|
|
|
select { |
|
|
|
|
case s2cErr := <-s2cErrChan: |
|
|
|
|
if s2cErr == io.EOF { |
|
|
|
|
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
|
|
|
|
|
// the clientStream>serverStream may continue pumping though.
|
|
|
|
|
clientStream.CloseSend() |
|
|
|
|
} else { |
|
|
|
|
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
|
|
|
|
|
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
|
|
|
|
|
// exit with an error to the stack
|
|
|
|
|
clientCancel() |
|
|
|
|
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) |
|
|
|
|
} |
|
|
|
|
case c2sErr := <-c2sErrChan: |
|
|
|
|
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
|
|
|
|
|
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
|
|
|
|
|
// will be nil.
|
|
|
|
|
serverStream.SetTrailer(clientStream.Trailer()) |
|
|
|
|
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
|
|
|
|
|
if c2sErr != io.EOF { |
|
|
|
|
return c2sErr |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.List(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) Watch(r *resource.WatchRequest, srv resource.ResourceStore_WatchServer) error { |
|
|
|
|
fmt.Println("distributing Watch") |
|
|
|
|
return nil |
|
|
|
|
// ctx := srv.Context()
|
|
|
|
|
|
|
|
|
|
// client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// return client.Watch(r, srv)
|
|
|
|
|
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO how to do this
|
|
|
|
|
// func (ds *DistributorServer) BulkProcess(r *resource.WatchRequest, srv resource.ResourceStore_WatchServer) error {
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) CountManagedObjects(ctx context.Context, r *resource.CountManagedObjectsRequest) (*resource.CountManagedObjectsResponse, error) { |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.CountManagedObjects(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
func (ds *DistributorServer) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { |
|
|
|
|
ret := make(chan error, 1) |
|
|
|
|
go func() { |
|
|
|
|
f := &emptypb.Empty{} |
|
|
|
|
for i := 0; ; i++ { |
|
|
|
|
if err := src.RecvMsg(f); err != nil { |
|
|
|
|
ret <- err // this can be io.EOF which is happy case
|
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
if i == 0 { |
|
|
|
|
// This is a bit of a hack, but client to server headers are only readable after first client msg is
|
|
|
|
|
// received but must be written to server stream before the first msg is flushed.
|
|
|
|
|
// This is the only place to do it nicely.
|
|
|
|
|
md, err := src.Header() |
|
|
|
|
if err != nil { |
|
|
|
|
ret <- err |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
if err := dst.SendHeader(md); err != nil { |
|
|
|
|
ret <- err |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if err := dst.SendMsg(f); err != nil { |
|
|
|
|
ret <- err |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return ret |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) ListManagedObjects(ctx context.Context, r *resource.ListManagedObjectsRequest) (*resource.ListManagedObjectsResponse, error) { |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.ListManagedObjects(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
func (ds *DistributorServer) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { |
|
|
|
|
ret := make(chan error, 1) |
|
|
|
|
go func() { |
|
|
|
|
f := &emptypb.Empty{} |
|
|
|
|
for i := 0; ; i++ { |
|
|
|
|
if err := src.RecvMsg(f); err != nil { |
|
|
|
|
ret <- err // this can be io.EOF which is happy case
|
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
if err := dst.SendMsg(f); err != nil { |
|
|
|
|
ret <- err |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return ret |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) PutBlob(ctx context.Context, r *resource.PutBlobRequest) (*resource.PutBlobResponse, error) { |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
func (ds *DistributorServer) getNamespaceFromContext(ctx context.Context) string { |
|
|
|
|
md, ok := metadata.FromIncomingContext(ctx) |
|
|
|
|
if !ok { |
|
|
|
|
return "" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.PutBlob(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
if namespace := md.Get("namespace"); len(namespace) > 0 { |
|
|
|
|
if namespace[0] == "*" { |
|
|
|
|
return "" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) GetBlob(ctx context.Context, r *resource.GetBlobRequest) (*resource.GetBlobResponse, error) { |
|
|
|
|
client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return namespace[0] |
|
|
|
|
} else { |
|
|
|
|
return "" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.GetBlob(userutils.InjectOrgID(ctx, "1"), r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *DistributorServer) getClientToDistributeRequest(ctx context.Context, namespace string) (resource.ResourceClient, error) { |
|
|
|
|
func (ds *DistributorServer) getClientConnToDistributeRequest(ctx context.Context, namespace string) (*grpc.ClientConn, error) { |
|
|
|
|
ringHasher := fnv.New32a() |
|
|
|
|
_, err := ringHasher.Write([]byte(namespace)) |
|
|
|
|
if err != nil { |
|
|
|
@ -250,7 +283,7 @@ func (ds *DistributorServer) getClientToDistributeRequest(ctx context.Context, n |
|
|
|
|
|
|
|
|
|
fmt.Println("distributing request to ", rs.Instances[0].Id) |
|
|
|
|
|
|
|
|
|
return client.(*resource.RingClient).Client, nil |
|
|
|
|
return client.(*ringClient).conn, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type healthServer struct{} |
|
|
|
|