|
|
|
@ -2,7 +2,6 @@ package server |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"hash/fnv" |
|
|
|
|
"io" |
|
|
|
@ -151,6 +150,7 @@ func (ds *DistributorServer) handler(srv interface{}, serverStream grpc.ServerSt |
|
|
|
|
namespace := ds.getNamespaceFromContext(serverStream.Context()) |
|
|
|
|
|
|
|
|
|
// TODO if namespace is not present or is *, assign random pod for now
|
|
|
|
|
fmt.Println("got namespace: ", namespace) |
|
|
|
|
conn, err := ds.getClientConnToDistributeRequest(serverStream.Context(), namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
@ -172,7 +172,8 @@ func (ds *DistributorServer) handler(srv interface{}, serverStream grpc.ServerSt |
|
|
|
|
for i := 0; i < 2; i++ { |
|
|
|
|
select { |
|
|
|
|
case s2cErr := <-s2cErrChan: |
|
|
|
|
if errors.Is(s2cErr, io.EOF) { |
|
|
|
|
//nolint:errorlint
|
|
|
|
|
if s2cErr == io.EOF { |
|
|
|
|
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
|
|
|
|
|
// the clientStream>serverStream may continue pumping though.
|
|
|
|
|
// TODO handle err?
|
|
|
|
@ -190,7 +191,8 @@ func (ds *DistributorServer) handler(srv interface{}, serverStream grpc.ServerSt |
|
|
|
|
// will be nil.
|
|
|
|
|
serverStream.SetTrailer(clientStream.Trailer()) |
|
|
|
|
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
|
|
|
|
|
if errors.Is(c2sErr, io.EOF) { |
|
|
|
|
//nolint:errorlint
|
|
|
|
|
if c2sErr != io.EOF { |
|
|
|
|
return c2sErr |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|