fix(unified-storage): race in resource server watch (#105786)

pull/105921/head
Jean-Philippe Quéméner 2 months ago committed by GitHub
parent 04de9c2740
commit e57be36936
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 43
      pkg/storage/unified/resource/server.go

@ -918,14 +918,47 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor
}
defer s.broadcaster.Unsubscribe(stream)
// Determine a safe starting resource-version for the watch.
// When the client requests SendInitialEvents we will use the resource-version
// of the last object returned from the initial list (handled below).
// When the client supplies an explicit `since` we honour that.
// In the remaining case (SendInitialEvents == false && since == 0) we need
// a high-water-mark representing the current state of storage so that we
// donʼt replay events that happened before the watch was established. Using
// `mostRecentRV` – which is updated asynchronously by the broadcaster – is
// subject to races because the broadcaster may not yet have observed the
// latest committed writes. Instead we ask the backend directly for the
// current resource-version.
var mostRecentRV int64
if !req.SendInitialEvents && req.Since == 0 {
// This is a temporary hack only relevant for tests to ensure that the first events are sent.
// This is required because the SQL backend polls the database every 100ms.
// TODO: Implement a getLatestResourceVersion method in the backend.
time.Sleep(10 * time.Millisecond)
// We only need the current RV. A cheap way to obtain it is to issue a
// List with a very small limit and read the listRV returned by the
// iterator. The callback is a no-op so we avoid materialising any
// items.
listReq := &resourcepb.ListRequest{
Options: req.Options,
// This has right now no effect, as the list request only uses the limit if it lists from history or trash.
// It might be worth adding it in a subsequent PR. We only list once during setup of the watch, so it's
// fine for now.
Limit: 1,
}
rv, err := s.backend.ListIterator(ctx, listReq, func(ListIterator) error { return nil })
if err != nil {
// Fallback to the broadcasterʼs view if the backend lookup fails.
// This preserves previous behaviour while still eliminating the
// common race in the majority of cases.
s.log.Warn("watch: failed to fetch current RV from backend, falling back to broadcaster", "err", err)
mostRecentRV = s.mostRecentRV.Load()
} else {
mostRecentRV = rv
}
} else {
// For all other code-paths we either already have an explicit RV or we
// will derive it from the initial list below.
mostRecentRV = s.mostRecentRV.Load()
}
mostRecentRV := s.mostRecentRV.Load() // get the latest resource version
var initialEventsRV int64 // resource version coming from the initial events
if req.SendInitialEvents {
// Backfill the stream by adding every existing entities.

Loading…
Cancel
Save