fix(deps): update module google.golang.org/grpc to v1.70.0 (main) (#15955)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
pull/15910/head^2
renovate[bot] 4 months ago committed by GitHub
parent 4443ba4453
commit 653e37a164
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      go.mod
  2. 4
      go.sum
  3. 4
      pkg/push/go.mod
  4. 28
      pkg/push/go.sum
  5. 8
      vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go
  6. 2
      vendor/google.golang.org/grpc/balancer/grpclb/grpc_lb_v1/load_balancer.pb.go
  7. 27
      vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
  8. 8
      vendor/google.golang.org/grpc/balancer/weightedroundrobin/balancer.go
  9. 7
      vendor/google.golang.org/grpc/balancer/weightedroundrobin/weightedroundrobin.go
  10. 73
      vendor/google.golang.org/grpc/balancer_wrapper.go
  11. 2
      vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
  12. 2
      vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/altscontext.pb.go
  13. 2
      vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/handshaker.pb.go
  14. 2
      vendor/google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp/transport_security_common.pb.go
  15. 6
      vendor/google.golang.org/grpc/credentials/tls.go
  16. 5
      vendor/google.golang.org/grpc/dialoptions.go
  17. 2
      vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go
  18. 2
      vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
  19. 6
      vendor/google.golang.org/grpc/internal/envconfig/xds.go
  20. 4
      vendor/google.golang.org/grpc/internal/internal.go
  21. 2
      vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls.pb.go
  22. 2
      vendor/google.golang.org/grpc/internal/proto/grpc_lookup_v1/rls_config.pb.go
  23. 2
      vendor/google.golang.org/grpc/internal/transport/handler_server.go
  24. 4
      vendor/google.golang.org/grpc/internal/transport/http2_server.go
  25. 10
      vendor/google.golang.org/grpc/server.go
  26. 17
      vendor/google.golang.org/grpc/service_config.go
  27. 2
      vendor/google.golang.org/grpc/stream.go
  28. 2
      vendor/google.golang.org/grpc/version.go
  29. 2
      vendor/google.golang.org/grpc/xds/internal/balancer/clusterimpl/picker.go
  30. 30
      vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/clusterresolver.go
  31. 70
      vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/configbuilder.go
  32. 5
      vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go
  33. 26
      vendor/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go
  34. 146
      vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go
  35. 154
      vendor/google.golang.org/grpc/xds/internal/balancer/outlierdetection/subconn_wrapper.go
  36. 7
      vendor/google.golang.org/grpc/xds/internal/balancer/wrrlocality/balancer.go
  37. 6
      vendor/google.golang.org/grpc/xds/internal/internal.go
  38. 13
      vendor/google.golang.org/grpc/xds/internal/xdsclient/authority.go
  39. 15
      vendor/google.golang.org/grpc/xds/internal/xdsclient/client_new.go
  40. 20
      vendor/google.golang.org/grpc/xds/internal/xdsclient/client_refcounted.go
  41. 77
      vendor/google.golang.org/grpc/xds/internal/xdsclient/clientimpl.go
  42. 2
      vendor/google.golang.org/grpc/xds/internal/xdsclient/transport/ads/ads_stream.go
  43. 2
      vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/type_eds.go
  44. 19
      vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/unmarshal_eds.go
  45. 2
      vendor/modules.txt

@ -104,7 +104,7 @@ require (
golang.org/x/sys v0.29.0
golang.org/x/time v0.9.0
google.golang.org/api v0.218.0
google.golang.org/grpc v1.69.4
google.golang.org/grpc v1.70.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/klog/v2 v2.130.1

@ -1638,8 +1638,8 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

@ -7,7 +7,7 @@ toolchain go1.23.3
require (
github.com/gogo/protobuf v1.3.2
github.com/stretchr/testify v1.10.0
google.golang.org/grpc v1.69.4
google.golang.org/grpc v1.70.0
)
require (
@ -18,7 +18,7 @@ require (
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

@ -27,16 +27,16 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4=
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -70,10 +70,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

@ -35,11 +35,9 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@ -48,11 +46,7 @@ import (
var PickFirstConfig string
func init() {
name := pickfirst.Name
if !envconfig.NewPickFirstEnabled {
name = pickfirstleaf.Name
}
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name)
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name)
}
// ChildState is the balancer state of a child along with the endpoint which

@ -19,7 +19,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/lb/v1/load_balancer.proto

@ -54,9 +54,18 @@ func init() {
balancer.Register(pickfirstBuilder{})
}
// enableHealthListenerKeyType is a unique key type used in resolver attributes
// to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}
type (
// enableHealthListenerKeyType is a unique key type used in resolver
// attributes to indicate whether the health listener usage is enabled.
enableHealthListenerKeyType struct{}
// managedByPickfirstKeyType is an attribute key type to inform Outlier
// Detection that the generic health listener is being used.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when
// implementing the dualstack design. This is a hack. Once Dualstack is
// completed, outlier detection will stop sending ejection updates through
// the connectivity listener.
managedByPickfirstKeyType struct{}
)
var (
logger = grpclog.Component("pick-first-leaf-lb")
@ -140,6 +149,17 @@ func EnableHealthListener(state resolver.State) resolver.State {
return state
}
// IsManagedByPickfirst returns whether an address belongs to a SubConn
// managed by the pickfirst LB policy.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable
// outlier_detection via the with connectivity listener when using pick_first.
// Once Dualstack changes are complete, all SubConns will be created by
// pick_first and outlier detection will only use the health listener for
// ejection. This hack can then be removed.
func IsManagedByPickfirst(addr resolver.Address) bool {
return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil
}
type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
@ -166,6 +186,7 @@ type scData struct {
}
func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true)
sd := &scData{
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,

@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/endpointsharding"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/balancer/weightedroundrobin/internal"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
@ -218,7 +219,9 @@ type wrrBalancer struct {
}
func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
b.logger.Infof("UpdateCCS: %v", ccs)
if b.logger.V(2) {
b.logger.Infof("UpdateCCS: %v", ccs)
}
cfg, ok := ccs.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig)
@ -232,6 +235,9 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
b.mu.Unlock()
// Make pickfirst children use health listeners for outlier detection to
// work.
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
// This causes child to update picker inline and will thus cause inline
// picker update.
return b.child.UpdateClientConnState(balancer.ClientConnState{

@ -56,6 +56,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
return addr
}
// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes
// field is updated with addrInfo.
func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo)
return endpoint
}
// GetAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
// addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {

@ -34,7 +34,15 @@ import (
"google.golang.org/grpc/status"
)
var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
var (
setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
// noOpRegisterHealthListenerFn is used when client side health checking is
// disabled. It sends a single READY update on the registered listener.
noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return func() {}
}
)
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
@ -277,10 +285,17 @@ type healthData struct {
// to the LB policy. This is stored to avoid sending updates when the
// SubConn has already exited connectivity state READY.
connectivityState connectivity.State
// closeHealthProducer stores function to close the ref counted health
// producer. The health producer is automatically closed when the SubConn
// state changes.
closeHealthProducer func()
}
func newHealthData(s connectivity.State) *healthData {
return &healthData{connectivityState: s}
return &healthData{
connectivityState: s,
closeHealthProducer: func() {},
}
}
// updateState is invoked by grpc to push a subConn state update to the
@ -413,6 +428,37 @@ func (acbw *acBalancerWrapper) closeProducers() {
}
}
// healthProducerRegisterFn is a type alias for the health producer's function
// for registering listeners.
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
// healthListenerRegFn returns a function to register a listener for health
// updates. If client side health checks are disabled, the registered listener
// will get a single READY (raw connectivity state) update.
//
// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
if acbw.ccb.cc.dopts.disableHealthCheck {
return noOpRegisterHealthListenerFn
}
regHealthLisFn := internal.RegisterClientHealthCheckListener
if regHealthLisFn == nil {
// The health package is not imported.
return noOpRegisterHealthListenerFn
}
cfg := acbw.ac.cc.healthCheckConfig()
if cfg == nil {
return noOpRegisterHealthListenerFn
}
return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
}
}
// RegisterHealthListener accepts a health listener from the LB policy. It sends
// updates to the health listener as long as the SubConn's connectivity state
// doesn't change and a new health listener is not registered. To invalidate
@ -421,6 +467,7 @@ func (acbw *acBalancerWrapper) closeProducers() {
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
acbw.healthData.closeHealthProducer()
// listeners should not be registered when the connectivity state
// isn't Ready. This may happen when the balancer registers a listener
// after the connectivityState is updated, but before it is notified
@ -436,6 +483,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}
registerFn := acbw.healthListenerRegFn()
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
@ -443,10 +491,25 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
// Don't send updates if a new listener is registered.
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
curHD := acbw.healthData
if curHD != hd {
if acbw.healthData != hd {
return
}
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Serialize the health updates from the health producer with
// other calls into the LB policy.
listenerWrapper := func(scs balancer.SubConnState) {
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
if acbw.healthData != hd {
return
}
listener(scs)
})
}
hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
})
}

@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/binlog/v1/binarylog.proto

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/gcp/altscontext.proto

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/gcp/handshaker.proto

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/gcp/transport_security_common.proto

@ -32,6 +32,8 @@ import (
"google.golang.org/grpc/internal/envconfig"
)
const alpnFailureHelpMessage = "If you upgraded from a grpc-go version earlier than 1.67, your TLS connections may have stopped working due to ALPN enforcement. For more details, see: https://github.com/grpc/grpc-go/issues/434"
var logger = grpclog.Component("credentials")
// TLSInfo contains the auth information for a TLS authenticated connection.
@ -128,7 +130,7 @@ func (c *tlsCreds) ClientHandshake(ctx context.Context, authority string, rawCon
if np == "" {
if envconfig.EnforceALPNEnabled {
conn.Close()
return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property")
return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property. %s", alpnFailureHelpMessage)
}
logger.Warningf("Allowing TLS connection to server %q with ALPN disabled. TLS connections to servers with ALPN disabled will be disallowed in future grpc-go releases", cfg.ServerName)
}
@ -158,7 +160,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
if cs.NegotiatedProtocol == "" {
if envconfig.EnforceALPNEnabled {
conn.Close()
return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property")
return nil, nil, fmt.Errorf("credentials: cannot check peer: missing selected ALPN property. %s", alpnFailureHelpMessage)
} else if logger.V(2) {
logger.Info("Allowing TLS connection from client with ALPN disabled. TLS connections with ALPN disabled will be disallowed in future grpc-go releases")
}

@ -428,6 +428,11 @@ func WithTimeout(d time.Duration) DialOption {
// returned by f, gRPC checks the error's Temporary() method to decide if it
// should try to reconnect to the network address.
//
// Note that gRPC by default performs name resolution on the target passed to
// NewClient. To bypass name resolution and cause the target string to be
// passed directly to the dialer here instead, use the "passthrough" resolver
// by specifying it in the target string, e.g. "passthrough:target".
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, use a net.Dialer that sets

@ -17,7 +17,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/health/v1/health.proto

@ -49,7 +49,7 @@ var (
// XDSFallbackSupport is the env variable that controls whether support for
// xDS fallback is turned on. If this is unset or is false, only the first
// xDS server in the list of server configs will be used.
XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", false)
XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", true)
// NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used
// instead of the exiting pickfirst implementation. This can be enabled by
// setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST"

@ -53,4 +53,10 @@ var (
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
// XDSDualstackEndpointsEnabled is true if gRPC should read the
// "additional addresses" in the xDS endpoint resource.
// TODO: https://github.com/grpc/grpc-go/issues/7866 - Control this using
// an env variable when all LB policies handle endpoints.
XDSDualstackEndpointsEnabled = false
)

@ -31,6 +31,10 @@ import (
var (
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
// RegisterClientHealthCheckListener is used to provide a listener for
// updates from the client-side health checking service. It returns a
// function that can be called to stop the health producer.
RegisterClientHealthCheckListener any // func(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func()
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by

@ -14,7 +14,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/lookup/v1/rls.proto

@ -14,7 +14,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.1
// protoc-gen-go v1.35.2
// protoc v5.27.1
// source: grpc/lookup/v1/rls_config.proto

@ -498,5 +498,5 @@ func mapRecvMsgError(err error) error {
if strings.Contains(err.Error(), "body closed by handler") {
return status.Error(codes.Canceled, err.Error())
}
return connectionErrorf(true, err, err.Error())
return connectionErrorf(true, err, "%s", err.Error())
}

@ -564,7 +564,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 405,
httpStatus: http.StatusMethodNotAllowed,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
@ -585,7 +585,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 200,
httpStatus: http.StatusOK,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,

@ -1360,8 +1360,16 @@ func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerSt
}
return err
}
defer d.Free()
freed := false
dataFree := func() {
if !freed {
d.Free()
freed = true
}
}
defer dataFree()
df := func(v any) error {
defer dataFree()
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}

@ -268,18 +268,21 @@ func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
return &serviceconfig.ParseResult{Config: &sc}
}
func isValidRetryPolicy(jrp *jsonRetryPolicy) bool {
return jrp.MaxAttempts > 1 &&
jrp.InitialBackoff > 0 &&
jrp.MaxBackoff > 0 &&
jrp.BackoffMultiplier > 0 &&
len(jrp.RetryableStatusCodes) > 0
}
func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalserviceconfig.RetryPolicy, err error) {
if jrp == nil {
return nil, nil
}
if jrp.MaxAttempts <= 1 ||
jrp.InitialBackoff <= 0 ||
jrp.MaxBackoff <= 0 ||
jrp.BackoffMultiplier <= 0 ||
len(jrp.RetryableStatusCodes) == 0 {
logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
return nil, nil
if !isValidRetryPolicy(jrp) {
return nil, fmt.Errorf("invalid retry policy (%+v): ", jrp)
}
if jrp.MaxAttempts < maxAttempts {

@ -1766,7 +1766,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
return err
}
if err == io.ErrUnexpectedEOF {
err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.69.4"
const Version = "1.70.0"

@ -129,7 +129,7 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if d.loadStore != nil {
d.loadStore.CallDropped("")
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
return balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
}
}

@ -234,7 +234,7 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
}
childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
childCfgBytes, endpoints, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
if err != nil {
b.logger.Warningf("Failed to build child policy config: %v", err)
return
@ -248,15 +248,33 @@ func (b *clusterResolverBalancer) updateChildConfig() {
b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg))
}
endpoints := make([]resolver.Endpoint, len(addrs))
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
flattenedAddrs := make([]resolver.Address, len(endpoints))
for i := range endpoints {
for j := range endpoints[i].Addresses {
addr := endpoints[i].Addresses[j]
addr.BalancerAttributes = endpoints[i].Attributes
// If the endpoint has multiple addresses, only the first is added
// to the flattened address list. This ensures that LB policies
// that don't support endpoints create only one subchannel to a
// backend.
if j == 0 {
flattenedAddrs[i] = addr
}
// BalancerAttributes need to be present in endpoint addresses. This
// temporary workaround is required to make load reporting work
// with the old pickfirst policy which creates SubConns with multiple
// addresses. Since the addresses can be from different localities,
// an Address.BalancerAttribute is used to identify the locality of the
// address used by the transport. This workaround can be removed once
// the old pickfirst is removed.
// See https://github.com/grpc/grpc-go/issues/7339
endpoints[i].Addresses[j] = addr
}
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: endpoints,
Addresses: addrs,
Addresses: flattenedAddrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
},

@ -48,8 +48,8 @@ type priorityConfig struct {
mechanism DiscoveryMechanism
// edsResp is set only if type is EDS.
edsResp xdsresource.EndpointsUpdate
// addresses is set only if type is DNS.
addresses []string
// endpoints is set only if type is DNS.
endpoints []resolver.Endpoint
// Each discovery mechanism has a name generator so that the child policies
// can reuse names between updates (EDS updates for example).
childNameGen *nameGenerator
@ -71,8 +71,8 @@ type priorityConfig struct {
// ┌──────▼─────┐ ┌─────▼──────┐
// │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer)
// └────────────┘ └────────────┘
func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Endpoint, error) {
pc, endpoints, err := buildPriorityConfig(priorities, xdsLBPolicy)
if err != nil {
return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
}
@ -80,23 +80,23 @@ func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internals
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
}
return ret, addrs, nil
return ret, endpoints, nil
}
func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Endpoint, error) {
var (
retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
retAddrs []resolver.Address
retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
retEndpoints []resolver.Endpoint
)
for _, p := range priorities {
switch p.mechanism.Type {
case DiscoveryMechanismTypeEDS:
names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
names, configs, endpoints, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
if err != nil {
return nil, nil, err
}
retConfig.Priorities = append(retConfig.Priorities, names...)
retAddrs = append(retAddrs, addrs...)
retEndpoints = append(retEndpoints, endpoints...)
odCfgs := convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection)
for n, c := range odCfgs {
retConfig.Children[n] = &priority.Child{
@ -107,9 +107,9 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
}
continue
case DiscoveryMechanismTypeLogicalDNS:
name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
name, config, endpoints := buildClusterImplConfigForDNS(p.childNameGen, p.endpoints, p.mechanism)
retConfig.Priorities = append(retConfig.Priorities, name)
retAddrs = append(retAddrs, addrs...)
retEndpoints = append(retEndpoints, endpoints...)
odCfg := makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
retConfig.Children[name] = &priority.Child{
Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg},
@ -120,7 +120,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
continue
}
}
return retConfig, retAddrs, nil
return retConfig, retEndpoints, nil
}
func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig {
@ -137,19 +137,22 @@ func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg out
return &odCfgRet
}
func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
func buildClusterImplConfigForDNS(g *nameGenerator, endpoints []resolver.Endpoint, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Endpoint) {
// Endpoint picking policy for DNS is hardcoded to pick_first.
const childPolicy = "pick_first"
retAddrs := make([]resolver.Address, 0, len(addrStrs))
retEndpoints := make([]resolver.Endpoint, len(endpoints))
pName := fmt.Sprintf("priority-%v", g.prefix)
for _, addrStr := range addrStrs {
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
for i, e := range endpoints {
retEndpoints[i] = hierarchy.SetInEndpoint(e, []string{pName})
// Copy the nested address field as slice fields are shared by the
// iteration variable and the original slice.
retEndpoints[i].Addresses = append([]resolver.Address{}, e.Addresses...)
}
return pName, &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
}, retAddrs
}, retEndpoints
}
// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
@ -161,7 +164,7 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism
// - map{"p0":p0_config, "p1":p1_config}
// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
// - p0 addresses' hierarchy attributes are set to p0
func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Endpoint, error) {
drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
for _, d := range edsResp.Drops {
drops = append(drops, clusterimpl.DropConfig{
@ -183,17 +186,17 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint
}
retNames := g.generate(priorities)
retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
var retAddrs []resolver.Address
var retEndpoints []resolver.Endpoint
for i, pName := range retNames {
priorityLocalities := priorities[i]
cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
cfg, endpoints, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
if err != nil {
return nil, nil, nil, err
}
retConfigs[pName] = cfg
retAddrs = append(retAddrs, addrs...)
retEndpoints = append(retEndpoints, endpoints...)
}
return retNames, retConfigs, retAddrs, nil
return retNames, retConfigs, retEndpoints, nil
}
// groupLocalitiesByPriority returns the localities grouped by priority.
@ -244,8 +247,8 @@ func dedupSortedIntSlice(a []int) []int {
// priority), and generates a cluster impl policy config, and a list of
// addresses with their path hierarchy set to [priority-name, locality-name], so
// priority and the xDS LB Policy know which child policy each address is for.
func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
var addrs []resolver.Address
func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Endpoint, error) {
var retEndpoints []resolver.Endpoint
for _, locality := range localities {
var lw uint32 = 1
if locality.Weight != 0 {
@ -262,21 +265,24 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
continue
}
addr := resolver.Address{Addr: endpoint.Address}
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
addr = internal.SetLocalityID(addr, locality.ID)
resolverEndpoint := resolver.Endpoint{}
for _, as := range endpoint.Addresses {
resolverEndpoint.Addresses = append(resolverEndpoint.Addresses, resolver.Address{Addr: as})
}
resolverEndpoint = hierarchy.SetInEndpoint(resolverEndpoint, []string{priorityName, localityStr})
resolverEndpoint = internal.SetLocalityIDInEndpoint(resolverEndpoint, locality.ID)
// "To provide the xds_wrr_locality load balancer information about
// locality weights received from EDS, the cluster resolver will
// populate a new locality weight attribute for each address The
// attribute will have the weight (as an integer) of the locality
// the address is part of." - A52
addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
resolverEndpoint = wrrlocality.SetAddrInfoInEndpoint(resolverEndpoint, wrrlocality.AddrInfo{LocalityWeight: lw})
var ew uint32 = 1
if endpoint.Weight != 0 {
ew = endpoint.Weight
}
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
addrs = append(addrs, addr)
resolverEndpoint = weightedroundrobin.SetAddrInfoInEndpoint(resolverEndpoint, weightedroundrobin.AddrInfo{Weight: lw * ew})
retEndpoints = append(retEndpoints, resolverEndpoint)
}
}
return &clusterimpl.LBConfig{
@ -287,5 +293,5 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
TelemetryLabels: mechanism.TelemetryLabels,
DropCategories: drops,
ChildPolicy: xdsLBPolicy,
}, addrs, nil
}, retEndpoints, nil
}

@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -294,8 +295,8 @@ func (rr *resourceResolver) generateLocked(onDone xdsresource.OnDoneFunc) {
switch uu := u.(type) {
case xdsresource.EndpointsUpdate:
ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen})
case []string:
ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen})
case []resolver.Endpoint:
ret = append(ret, priorityConfig{mechanism: rDM.dm, endpoints: uu, childNameGen: rDM.childNameGen})
}
}
select {

@ -47,7 +47,7 @@ type dnsDiscoveryMechanism struct {
logger *grpclog.PrefixLogger
mu sync.Mutex
addrs []string
endpoints []resolver.Endpoint
updateReceived bool
}
@ -103,7 +103,7 @@ func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) {
if !dr.updateReceived {
return nil, false
}
return dr.addrs, true
return dr.endpoints, true
}
func (dr *dnsDiscoveryMechanism) resolveNow() {
@ -133,23 +133,15 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
}
dr.mu.Lock()
var addrs []string
if len(state.Endpoints) > 0 {
// Assume 1 address per endpoint, which is how DNS is expected to
// behave. The slice will grow as needed, however.
addrs = make([]string, 0, len(state.Endpoints))
for _, e := range state.Endpoints {
for _, a := range e.Addresses {
addrs = append(addrs, a.Addr)
}
}
} else {
addrs = make([]string, len(state.Addresses))
var endpoints = state.Endpoints
if len(endpoints) == 0 {
endpoints = make([]resolver.Endpoint, len(state.Addresses))
for i, a := range state.Addresses {
addrs[i] = a.Addr
endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}}
endpoints[i].Attributes = a.BalancerAttributes
}
}
dr.addrs = addrs
dr.endpoints = endpoints
dr.updateReceived = true
dr.mu.Unlock()
@ -172,7 +164,7 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
dr.mu.Unlock()
return
}
dr.addrs = nil
dr.endpoints = nil
dr.updateReceived = true
dr.mu.Unlock()

@ -33,6 +33,7 @@ import (
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
@ -72,7 +73,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
b.child = gracefulswitch.NewBalancer(b, bOpts)
b.child = synchronizingBalancerWrapper{lb: gracefulswitch.NewBalancer(b, bOpts)}
go b.run()
return b
}
@ -152,6 +153,11 @@ type lbCfgUpdate struct {
done chan struct{}
}
type scHealthUpdate struct {
scw *subConnWrapper
state balancer.SubConnState
}
type outlierDetectionBalancer struct {
// These fields are safe to be accessed without holding any mutex because
// they are synchronized in run(), which makes these field accesses happen
@ -170,10 +176,7 @@ type outlierDetectionBalancer struct {
logger *grpclog.PrefixLogger
channelzParent channelz.Identifier
// childMu guards calls into child (to uphold the balancer.Balancer API
// guarantee of synchronous calls).
childMu sync.Mutex
child *gracefulswitch.Balancer
child synchronizingBalancerWrapper
// mu guards access to the following fields. It also helps to synchronize
// behaviors of the following events: config updates, firing of the interval
@ -190,8 +193,8 @@ type outlierDetectionBalancer struct {
// which uses addrs. This balancer waits for the interval timer algorithm to
// finish before making the update to the addrs map.
//
// This mutex is never held at the same time as childMu (within the context
// of a single goroutine).
// This mutex is never held when calling methods on the child policy
// (within the context of a single goroutine).
mu sync.Mutex
addrs map[string]*addressInfo
cfg *LBConfig
@ -276,13 +279,9 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
// the balancer.Balancer API, so it is guaranteed to be called in a
// synchronous manner, so it cannot race with this read.
if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
b.childMu.Lock()
err := b.child.SwitchTo(bb)
if err != nil {
b.childMu.Unlock()
if err := b.child.switchTo(bb); err != nil {
return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
}
b.childMu.Unlock()
}
b.mu.Lock()
@ -319,12 +318,10 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
}
b.mu.Unlock()
b.childMu.Lock()
err := b.child.UpdateClientConnState(balancer.ClientConnState{
err := b.child.updateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.cfg.ChildPolicy.Config,
})
b.childMu.Unlock()
done := make(chan struct{})
b.pickerUpdateCh.Put(lbCfgUpdate{
@ -337,9 +334,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
}
func (b *outlierDetectionBalancer) ResolverError(err error) {
b.childMu.Lock()
defer b.childMu.Unlock()
b.child.ResolverError(err)
b.child.resolverError(err)
}
func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
@ -355,6 +350,7 @@ func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state
if state.ConnectivityState == connectivity.Shutdown {
delete(b.scWrappers, scw.SubConn)
}
scw.setLatestConnectivityState(state.ConnectivityState)
b.scUpdateCh.Put(&scUpdate{
scw: scw,
state: state,
@ -368,9 +364,7 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state
func (b *outlierDetectionBalancer) Close() {
b.closed.Fire()
<-b.done.Done()
b.childMu.Lock()
b.child.Close()
b.childMu.Unlock()
b.child.closeLB()
b.scUpdateCh.Close()
b.pickerUpdateCh.Close()
@ -383,9 +377,7 @@ func (b *outlierDetectionBalancer) Close() {
}
func (b *outlierDetectionBalancer) ExitIdle() {
b.childMu.Lock()
defer b.childMu.Unlock()
b.child.ExitIdle()
b.child.exitIdle()
}
// wrappedPicker delegates to the child policy's picker, and when the request
@ -475,10 +467,13 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
return nil, err
}
scw := &subConnWrapper{
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
SubConn: sc,
addresses: addrs,
scUpdateCh: b.scUpdateCh,
listener: oldListener,
latestRawConnectivityState: balancer.SubConnState{ConnectivityState: connectivity.Idle},
latestHealthState: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
healthListenerEnabled: len(addrs) == 1 && pickfirstleaf.IsManagedByPickfirst(addrs[0]),
}
b.mu.Lock()
defer b.mu.Unlock()
@ -596,34 +591,18 @@ func (b *outlierDetectionBalancer) Target() string {
// if the SubConn is not ejected.
func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
scw := u.scw
scw.latestState = u.state
if !scw.ejected {
if scw.listener != nil {
b.childMu.Lock()
scw.listener(u.state)
b.childMu.Unlock()
}
}
scw.clearHealthListener()
b.child.updateSubConnState(scw, u.state)
}
func (b *outlierDetectionBalancer) handleSubConnHealthUpdate(u *scHealthUpdate) {
b.child.updateSubConnHealthState(u.scw, u.state)
}
// handleEjectedUpdate handles any SubConns that get ejected/unejected, and
// forwards the appropriate corresponding subConnState to the child policy.
func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
scw := u.scw
scw.ejected = u.isEjected
// If scw.latestState has never been written to will default to connectivity
// IDLE, which is fine.
stateToUpdate := scw.latestState
if u.isEjected {
stateToUpdate = balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
}
}
if scw.listener != nil {
b.childMu.Lock()
scw.listener(stateToUpdate)
b.childMu.Unlock()
}
b.child.handleEjectionUpdate(u)
}
// handleChildStateUpdate forwards the picker update wrapped in a wrapped picker
@ -696,6 +675,8 @@ func (b *outlierDetectionBalancer) run() {
b.handleSubConnUpdate(u)
case *ejectionUpdate:
b.handleEjectedUpdate(u)
case *scHealthUpdate:
b.handleSubConnHealthUpdate(u)
}
case update, ok := <-b.pickerUpdateCh.Get():
if !ok {
@ -880,6 +861,69 @@ func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) {
}
}
// synchronizingBalancerWrapper serializes calls into balancer (to uphold the
// balancer.Balancer API guarantee of synchronous calls). It also ensures a
// consistent order of locking mutexes when using SubConn listeners to avoid
// deadlocks.
type synchronizingBalancerWrapper struct {
// mu should not be used directly from outside this struct, instead use
// methods defined on the struct.
mu sync.Mutex
lb *gracefulswitch.Balancer
}
func (sbw *synchronizingBalancerWrapper) switchTo(builder balancer.Builder) error {
sbw.mu.Lock()
defer sbw.mu.Unlock()
return sbw.lb.SwitchTo(builder)
}
func (sbw *synchronizingBalancerWrapper) updateClientConnState(state balancer.ClientConnState) error {
sbw.mu.Lock()
defer sbw.mu.Unlock()
return sbw.lb.UpdateClientConnState(state)
}
func (sbw *synchronizingBalancerWrapper) resolverError(err error) {
sbw.mu.Lock()
defer sbw.mu.Unlock()
sbw.lb.ResolverError(err)
}
func (sbw *synchronizingBalancerWrapper) closeLB() {
sbw.mu.Lock()
defer sbw.mu.Unlock()
sbw.lb.Close()
}
func (sbw *synchronizingBalancerWrapper) exitIdle() {
sbw.mu.Lock()
defer sbw.mu.Unlock()
sbw.lb.ExitIdle()
}
func (sbw *synchronizingBalancerWrapper) updateSubConnHealthState(scw *subConnWrapper, scs balancer.SubConnState) {
sbw.mu.Lock()
defer sbw.mu.Unlock()
scw.updateSubConnHealthState(scs)
}
func (sbw *synchronizingBalancerWrapper) updateSubConnState(scw *subConnWrapper, scs balancer.SubConnState) {
sbw.mu.Lock()
defer sbw.mu.Unlock()
scw.updateSubConnConnectivityState(scs)
}
func (sbw *synchronizingBalancerWrapper) handleEjectionUpdate(u *ejectionUpdate) {
sbw.mu.Lock()
defer sbw.mu.Unlock()
if u.isEjected {
u.scw.handleEjection()
} else {
u.scw.handleUnejection()
}
}
// addressInfo contains the runtime information about an address that pertains
// to Outlier Detection. This struct and all of its fields is protected by
// outlierDetectionBalancer.mu in the case where it is accessed through the

@ -19,9 +19,11 @@ package outlierdetection
import (
"fmt"
"sync"
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/resolver"
)
@ -31,23 +33,54 @@ import (
// whether or not this SubConn is ejected.
type subConnWrapper struct {
balancer.SubConn
listener func(balancer.SubConnState)
// addressInfo is a pointer to the subConnWrapper's corresponding address
// map entry, if the map entry exists.
// map entry, if the map entry exists. It is accessed atomically.
addressInfo unsafe.Pointer // *addressInfo
// The following fields are set during object creation and read-only after
// that.
listener func(balancer.SubConnState)
// healthListenerEnabled indicates whether the leaf LB policy is using a
// generic health listener. When enabled, ejection updates are sent via the
// health listener instead of the connectivity listener. Once Dualstack
// changes are complete, all SubConns will be created by pickfirst which
// uses the health listener.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - Once Dualstack
// changes are complete, all SubConns will be created by pick_first and
// outlier detection will only use the health listener for ejection and
// this field can be removed.
healthListenerEnabled bool
scUpdateCh *buffer.Unbounded
// The following fields are only referenced in the context of a work
// serializing buffer and don't need to be protected by a mutex.
// These two pieces of state will reach eventual consistency due to sync in
// run(), and child will always have the correctly updated SubConnState.
// latestState is the latest state update from the underlying SubConn. This
// is used whenever a SubConn gets unejected.
latestState balancer.SubConnState
ejected bool
scUpdateCh *buffer.Unbounded
ejected bool
// addresses is the list of address(es) this SubConn was created with to
// help support any change in address(es)
addresses []resolver.Address
// latestHealthState is tracked to update the child policy during
// unejection.
latestHealthState balancer.SubConnState
// latestRawConnectivityState is tracked to update the child policy during
// unejection.
latestRawConnectivityState balancer.SubConnState
// Access to the following fields are protected by a mutex. These fields
// should not be accessed from outside this file, instead use methods
// defined on the struct.
mu sync.Mutex
healthListener func(balancer.SubConnState)
// latestReceivedConnectivityState is the SubConn's most recent connectivity
// state received. It may not be delivered to the child balancer yet. It is
// used to ensure a health listener is registered with the SubConn only when
// the SubConn is READY.
latestReceivedConnectivityState connectivity.State
}
// eject causes the wrapper to report a state update with the TRANSIENT_FAILURE
@ -72,3 +105,108 @@ func (scw *subConnWrapper) uneject() {
func (scw *subConnWrapper) String() string {
return fmt.Sprintf("%+v", scw.addresses)
}
func (scw *subConnWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
// gRPC currently supports two mechanisms that provide a health signal for
// a connection: client-side health checking and outlier detection. Earlier
// both these mechanisms signaled unhealthiness by setting the subchannel
// state to TRANSIENT_FAILURE. As part of the dualstack changes to make
// pick_first the universal leaf policy (see A61), both these mechanisms
// started using the new health listener to make health signal visible to
// the petiole policies without affecting the underlying connectivity
// management of the pick_first policy
if !scw.healthListenerEnabled {
logger.Errorf("Health listener unexpectedly registered on SubConn %v.", scw)
return
}
scw.mu.Lock()
defer scw.mu.Unlock()
if scw.latestReceivedConnectivityState != connectivity.Ready {
return
}
scw.healthListener = listener
if listener == nil {
scw.SubConn.RegisterHealthListener(nil)
return
}
scw.SubConn.RegisterHealthListener(func(scs balancer.SubConnState) {
scw.scUpdateCh.Put(&scHealthUpdate{
scw: scw,
state: scs,
})
})
}
// updateSubConnHealthState stores the latest health state for unejection and
// sends updates the health listener.
func (scw *subConnWrapper) updateSubConnHealthState(scs balancer.SubConnState) {
scw.latestHealthState = scs
if scw.ejected {
return
}
scw.mu.Lock()
defer scw.mu.Unlock()
if scw.healthListener != nil {
scw.healthListener(scs)
}
}
// updateSubConnConnectivityState stores the latest connectivity state for
// unejection and updates the raw connectivity listener.
func (scw *subConnWrapper) updateSubConnConnectivityState(scs balancer.SubConnState) {
scw.latestRawConnectivityState = scs
// If the raw connectivity listener is used for ejection, and the SubConn is
// ejected, don't send the update.
if scw.ejected && !scw.healthListenerEnabled {
return
}
if scw.listener != nil {
scw.listener(scs)
}
}
func (scw *subConnWrapper) clearHealthListener() {
scw.mu.Lock()
defer scw.mu.Unlock()
scw.healthListener = nil
}
func (scw *subConnWrapper) handleUnejection() {
scw.ejected = false
if !scw.healthListenerEnabled {
// If scw.latestRawConnectivityState has never been written to will
// default to connectivity IDLE, which is fine.
scw.updateSubConnConnectivityState(scw.latestRawConnectivityState)
return
}
// If scw.latestHealthState has never been written to will use the health
// state CONNECTING set during object creation.
scw.updateSubConnHealthState(scw.latestHealthState)
}
func (scw *subConnWrapper) handleEjection() {
scw.ejected = true
stateToUpdate := balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
}
if !scw.healthListenerEnabled {
if scw.listener != nil {
scw.listener(stateToUpdate)
}
return
}
scw.mu.Lock()
defer scw.mu.Unlock()
if scw.healthListener != nil {
scw.healthListener(stateToUpdate)
}
}
func (scw *subConnWrapper) setLatestConnectivityState(state connectivity.State) {
scw.mu.Lock()
defer scw.mu.Unlock()
scw.latestReceivedConnectivityState = state
}

@ -120,6 +120,13 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
return addr
}
// SetAddrInfoInEndpoint returns a copy of endpoint in which the Attributes
// field is updated with AddrInfo.
func SetAddrInfoInEndpoint(endpoint resolver.Endpoint, addrInfo AddrInfo) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, addrInfo)
return endpoint
}
func (a AddrInfo) String() string {
return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight)
}

@ -86,6 +86,12 @@ func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address {
return addr
}
// SetLocalityIDInEndpoint sets locality ID in endpoint to l.
func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l LocalityID) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(localityKey, l)
return endpoint
}
// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType.
var ResourceTypeMapForTesting map[string]any

@ -639,6 +639,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
}
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the cached
// resource here for watchCallbackSerializer.
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
@ -646,9 +649,13 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
// immediately as well.
if state.md.Status == xdsresource.ServiceStatusNACKed {
if a.logger.V(2) {
a.logger.Infof("Resource type %q with resource name %q was NACKed: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
a.logger.Infof("Resource type %q with resource name %q was NACKed", rType.TypeName(), resourceName)
}
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
// state can only be accessed in the context of an
// xdsClientSerializer callback. Hence making a copy of the error
// here for watchCallbackSerializer.
err := state.md.ErrState.Err
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
@ -687,7 +694,7 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string,
delete(state.watchers, watcher)
if len(state.watchers) > 0 {
if a.logger.V(2) {
a.logger.Infof("%d more watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
a.logger.Infof("Other watchers exist for type %q, resource name %q", rType.TypeName(), resourceName)
}
return
}

@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
@ -61,11 +60,11 @@ func New(name string) (XDSClient, func(), error) {
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
return newRefCounted(name, config, defaultWatchExpiryTimeout, defaultIdleChannelExpiryTimeout, backoff.DefaultExponential.Backoff)
return newRefCounted(name, config, defaultWatchExpiryTimeout, backoff.DefaultExponential.Backoff)
}
// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
@ -78,7 +77,6 @@ func newClientImpl(config *bootstrap.Config, watchExpiryTimeout, idleChannelExpi
transportBuilder: &grpctransport.Builder{},
resourceTypes: newResourceTypeRegistry(),
xdsActiveChannels: make(map[string]*channelState),
xdsIdleChannels: cache.NewTimeoutCache(idleChannelExpiryTimeout),
}
for name, cfg := range config.Authorities() {
@ -121,10 +119,6 @@ type OptionsForTesting struct {
// unspecified, uses the default value used in non-test code.
WatchExpiryTimeout time.Duration
// IdleChannelExpiryTimeout is the timeout before idle xdsChannels are
// deleted. If unspecified, uses the default value used in non-test code.
IdleChannelExpiryTimeout time.Duration
// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures.
// If unspecified, uses the default value used in non-test code.
@ -147,9 +141,6 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.WatchExpiryTimeout == 0 {
opts.WatchExpiryTimeout = defaultWatchExpiryTimeout
}
if opts.IdleChannelExpiryTimeout == 0 {
opts.IdleChannelExpiryTimeout = defaultIdleChannelExpiryTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}
@ -158,7 +149,7 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if err != nil {
return nil, nil, err
}
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.IdleChannelExpiryTimeout, opts.StreamBackoffAfterFailure)
return newRefCounted(opts.Name, config, opts.WatchExpiryTimeout, opts.StreamBackoffAfterFailure)
}
// GetForTesting returns an xDS client created earlier using the given name.

@ -27,10 +27,7 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
)
const (
defaultWatchExpiryTimeout = 15 * time.Second
defaultIdleChannelExpiryTimeout = 5 * time.Minute
)
const defaultWatchExpiryTimeout = 15 * time.Second
var (
// The following functions are no-ops in the actual code, but can be
@ -43,26 +40,31 @@ var (
func clientRefCountedClose(name string) {
clientsMu.Lock()
defer clientsMu.Unlock()
client, ok := clients[name]
if !ok {
logger.Errorf("Attempt to close a non-existent xDS client with name %s", name)
clientsMu.Unlock()
return
}
if client.decrRef() != 0 {
clientsMu.Unlock()
return
}
delete(clients, name)
clientsMu.Unlock()
// This attempts to close the transport to the management server and could
// theoretically call back into the xdsclient package again and deadlock.
// Hence, this needs to be called without holding the lock.
client.clientImpl.close()
xdsClientImplCloseHook(name)
delete(clients, name)
}
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, idleChannelExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()
@ -72,7 +74,7 @@ func newRefCounted(name string, config *bootstrap.Config, watchExpiryTimeout, id
}
// Create the new client implementation.
c, err := newClientImpl(config, watchExpiryTimeout, idleChannelExpiryTimeout, streamBackoff)
c, err := newClientImpl(config, watchExpiryTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}

@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
@ -63,14 +62,9 @@ type clientImpl struct {
// these channels, and forwards updates from the channels to each of these
// authorities.
//
// Once all references to a channel are dropped, the channel is moved to the
// idle cache where it lives for a configured duration before being closed.
// If the channel is required before the idle timeout fires, it is revived
// from the idle cache and used.
// Once all references to a channel are dropped, the channel is closed.
channelsMu sync.Mutex
xdsActiveChannels map[string]*channelState // Map from server config to in-use xdsChannels.
xdsIdleChannels *cache.TimeoutCache // Map from server config to idle xdsChannels.
closeCond *sync.Cond
}
// channelState represents the state of an xDS channel. It tracks the number of
@ -173,21 +167,6 @@ func (c *clientImpl) close() {
c.close()
}
// Similarly, closing idle channels cannot be done with the lock held, for
// the same reason as described above. So, we clear the idle cache in a
// goroutine and use a condition variable to wait on the condition that the
// idle cache has zero entries. The Wait() method on the condition variable
// releases the lock and blocks the goroutine until signaled (which happens
// when an idle channel is removed from the cache and closed), and grabs the
// lock before returning.
c.channelsMu.Lock()
c.closeCond = sync.NewCond(&c.channelsMu)
go c.xdsIdleChannels.Clear(true)
for c.xdsIdleChannels.Len() > 0 {
c.closeCond.Wait()
}
c.channelsMu.Unlock()
c.serializerClose()
<-c.serializer.Done()
@ -289,27 +268,15 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
c.logger.Infof("Received request for a reference to an xdsChannel for server config %q", serverConfig)
}
// Use an active channel, if one exists for this server config.
// Use an existing channel, if one exists for this server config.
if state, ok := c.xdsActiveChannels[serverConfig.String()]; ok {
if c.logger.V(2) {
c.logger.Infof("Reusing an active xdsChannel for server config %q", serverConfig)
c.logger.Infof("Reusing an existing xdsChannel for server config %q", serverConfig)
}
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}
// If an idle channel exists for this server config, remove it from the
// idle cache and add it to the map of active channels, and return it.
if s, ok := c.xdsIdleChannels.Remove(serverConfig.String()); ok {
if c.logger.V(2) {
c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig)
}
state := s.(*channelState)
c.xdsActiveChannels[serverConfig.String()] = state
initLocked(state)
return state.channel, c.releaseChannel(serverConfig, state, deInitLocked), nil
}
if c.logger.V(2) {
c.logger.Infof("Creating a new xdsChannel for server config %q", serverConfig)
}
@ -345,9 +312,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
}
// releaseChannel is a function that is called when a reference to an xdsChannel
// needs to be released. It handles the logic of moving the channel to an idle
// cache if there are no other active references, and closing the channel if it
// remains in the idle cache for the configured duration.
// needs to be released. It handles closing channels with no active references.
//
// The function takes the following parameters:
// - serverConfig: the server configuration for the xdsChannel
@ -360,7 +325,6 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() {
return grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
defer c.channelsMu.Unlock()
if c.logger.V(2) {
c.logger.Infof("Received request to release a reference to an xdsChannel for server config %q", serverConfig)
@ -372,40 +336,17 @@ func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state
if c.logger.V(2) {
c.logger.Infof("xdsChannel %p has other active references", state.channel)
}
c.channelsMu.Unlock()
return
}
// Move the channel to the idle cache instead of closing
// immediately. If the channel remains in the idle cache for
// the configured duration, it will get closed.
delete(c.xdsActiveChannels, serverConfig.String())
if c.logger.V(2) {
c.logger.Infof("Moving xdsChannel [%p] for server config %s to the idle cache", state.channel, serverConfig)
c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig)
}
channelToClose := state.channel
c.channelsMu.Unlock()
// The idle cache expiry timeout results in the channel getting
// closed in another serializer callback.
c.xdsIdleChannels.Add(serverConfig.String(), state, grpcsync.OnceFunc(func() {
c.channelsMu.Lock()
channelToClose := state.channel
c.channelsMu.Unlock()
if c.logger.V(2) {
c.logger.Infof("Idle cache expiry timeout fired for xdsChannel [%p] for server config %s", state.channel, serverConfig)
}
channelToClose.close()
// If the channel is being closed as a result of the xDS client
// being closed, closeCond is non-nil and we need to signal from
// here to unblock Close(). Holding the lock is not necessary
// to call Signal() on a condition variable. But the field
// `c.closeCond` needs to guarded by the lock, which is why we
// acquire it here.
c.channelsMu.Lock()
if c.closeCond != nil {
c.closeCond.Signal()
}
c.channelsMu.Unlock()
}))
channelToClose.close()
})
}

@ -664,7 +664,7 @@ func (s *StreamImpl) onError(err error, msgReceived bool) {
// connection hitting its max connection age limit.
// (see [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)).
if msgReceived {
err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error())
err = xdsresource.NewErrorf(xdsresource.ErrTypeStreamFailedAfterRecv, "%s", err.Error())
}
s.eventHandler.OnADSStreamError(err)

@ -49,7 +49,7 @@ const (
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
Addresses []string
HealthStatus EndpointHealthStatus
Weight uint32
}

@ -26,6 +26,7 @@ import (
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal"
"google.golang.org/protobuf/proto"
@ -93,14 +94,22 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint, uniqueEndpointAddrs
}
weight = w.GetValue()
}
addr := parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())
if uniqueEndpointAddrs[addr] {
return nil, fmt.Errorf("duplicate endpoint with the same address %s", addr)
addrs := []string{parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress())}
if envconfig.XDSDualstackEndpointsEnabled {
for _, sa := range lbEndpoint.GetEndpoint().GetAdditionalAddresses() {
addrs = append(addrs, parseAddress(sa.GetAddress().GetSocketAddress()))
}
}
for _, a := range addrs {
if uniqueEndpointAddrs[a] {
return nil, fmt.Errorf("duplicate endpoint with the same address %s", a)
}
uniqueEndpointAddrs[a] = true
}
uniqueEndpointAddrs[addr] = true
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: addr,
Addresses: addrs,
Weight: weight,
})
}

@ -2049,7 +2049,7 @@ google.golang.org/genproto/googleapis/api/monitoredres
google.golang.org/genproto/googleapis/rpc/code
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.69.4
# google.golang.org/grpc v1.70.0
## explicit; go 1.22
google.golang.org/grpc
google.golang.org/grpc/attributes

Loading…
Cancel
Save