diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index c340c7a6e39..42474e706a3 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -918,15 +918,48 @@ func (s *server) Watch(req *resourcepb.WatchRequest, srv resourcepb.ResourceStor } defer s.broadcaster.Unsubscribe(stream) + // Determine a safe starting resource-version for the watch. + // When the client requests SendInitialEvents we will use the resource-version + // of the last object returned from the initial list (handled below). + // When the client supplies an explicit `since` we honour that. + // In the remaining case (SendInitialEvents == false && since == 0) we need + // a high-water-mark representing the current state of storage so that we + // donʼt replay events that happened before the watch was established. Using + // `mostRecentRV` – which is updated asynchronously by the broadcaster – is + // subject to races because the broadcaster may not yet have observed the + // latest committed writes. Instead we ask the backend directly for the + // current resource-version. + var mostRecentRV int64 if !req.SendInitialEvents && req.Since == 0 { - // This is a temporary hack only relevant for tests to ensure that the first events are sent. - // This is required because the SQL backend polls the database every 100ms. - // TODO: Implement a getLatestResourceVersion method in the backend. - time.Sleep(10 * time.Millisecond) + // We only need the current RV. A cheap way to obtain it is to issue a + // List with a very small limit and read the listRV returned by the + // iterator. The callback is a no-op so we avoid materialising any + // items. + listReq := &resourcepb.ListRequest{ + Options: req.Options, + // This has right now no effect, as the list request only uses the limit if it lists from history or trash. + // It might be worth adding it in a subsequent PR. We only list once during setup of the watch, so it's + // fine for now. + Limit: 1, + } + + rv, err := s.backend.ListIterator(ctx, listReq, func(ListIterator) error { return nil }) + if err != nil { + // Fallback to the broadcasterʼs view if the backend lookup fails. + // This preserves previous behaviour while still eliminating the + // common race in the majority of cases. + s.log.Warn("watch: failed to fetch current RV from backend, falling back to broadcaster", "err", err) + mostRecentRV = s.mostRecentRV.Load() + } else { + mostRecentRV = rv + } + } else { + // For all other code-paths we either already have an explicit RV or we + // will derive it from the initial list below. + mostRecentRV = s.mostRecentRV.Load() } - mostRecentRV := s.mostRecentRV.Load() // get the latest resource version - var initialEventsRV int64 // resource version coming from the initial events + var initialEventsRV int64 // resource version coming from the initial events if req.SendInitialEvents { // Backfill the stream by adding every existing entities. initialEventsRV, err = s.backend.ListIterator(ctx, &resourcepb.ListRequest{Options: req.Options}, func(iter ListIterator) error {