|
|
|
@ -41,11 +41,9 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu |
|
|
|
|
|
|
|
|
|
grpcServer := grpcHandler.GetServer() |
|
|
|
|
|
|
|
|
|
resourcepb.RegisterResourceStoreServer(grpcServer, distributorServer) |
|
|
|
|
// resourcepb.RegisterBulkStoreServer(grpcServer, distributorServer)
|
|
|
|
|
resourcepb.RegisterResourceIndexServer(grpcServer, distributorServer) |
|
|
|
|
resourcepb.RegisterManagedObjectIndexServer(grpcServer, distributorServer) |
|
|
|
|
resourcepb.RegisterBlobStoreServer(grpcServer, distributorServer) |
|
|
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthService) |
|
|
|
|
_, err = grpcserver.ProvideReflectionService(cfg, grpcHandler) |
|
|
|
|
if err != nil { |
|
|
|
@ -106,81 +104,6 @@ func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.Resourc |
|
|
|
|
return client.GetStats(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) Read(ctx context.Context, r *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Read") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Read(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) Create(ctx context.Context, r *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Create") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Create(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) Update(ctx context.Context, r *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Update") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Update(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) Delete(ctx context.Context, r *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Delete") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.Delete(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) List(ctx context.Context, r *resourcepb.ListRequest) (*resourcepb.ListResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "List") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.List(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) Watch(r *resourcepb.WatchRequest, srv resourcepb.ResourceStore_WatchServer) error { |
|
|
|
|
// r -> consumer watch request
|
|
|
|
|
// srv -> stream connection with consumer
|
|
|
|
|
ctx := srv.Context() |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Watch") |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// watchClient -> stream connection with storage-api pod
|
|
|
|
|
watchClient, err := client.Watch(ctx, r) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// WARNING
|
|
|
|
|
// in Watch, all messages flow from the resource server (watchClient) to the consumer (srv)
|
|
|
|
|
// but since this is a streaming connection, in theory the consumer could also send a message to the server
|
|
|
|
|
// however for the sake of simplicity we are not handling it here
|
|
|
|
|
// but if we decide to handle bi-directional message passing in this method, we will need to update this
|
|
|
|
|
// we also never handle EOF err, as the server never closes the connection willingly
|
|
|
|
|
for { |
|
|
|
|
msg, err := watchClient.Recv() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
_ = srv.Send(msg) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO implement this if we want to support it in cloud
|
|
|
|
|
// func (ds *DistributorServer) BulkProcess(srv BulkStore_BulkProcessServer) error {
|
|
|
|
|
// return nil
|
|
|
|
@ -204,24 +127,6 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource |
|
|
|
|
return client.ListManagedObjects(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) PutBlob(ctx context.Context, r *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace, "PutBlob") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.PutBlob(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) GetBlob(ctx context.Context, r *resourcepb.GetBlobRequest) (*resourcepb.GetBlobResponse, error) { |
|
|
|
|
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace, "GetBlob") |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return client.GetBlob(ctx, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) { |
|
|
|
|
ringHasher := fnv.New32a() |
|
|
|
|
_, err := ringHasher.Write([]byte(namespace)) |
|
|
|
|