|
|
|
@ -1112,14 +1112,12 @@ Exec_ListenPreCommit(void) |
|
|
|
|
amRegisteredListener = true; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Try to move our pointer forward as far as possible. This will skip over |
|
|
|
|
* already-committed notifications. Still, we could get notifications that |
|
|
|
|
* have already committed before we started to LISTEN. |
|
|
|
|
* |
|
|
|
|
* Note that we are not yet listening on anything, so we won't deliver any |
|
|
|
|
* notification to the frontend. Also, although our transaction might |
|
|
|
|
* have executed NOTIFY, those message(s) aren't queued yet so we can't |
|
|
|
|
* see them in the queue. |
|
|
|
|
* Try to move our pointer forward as far as possible. This will skip |
|
|
|
|
* over already-committed notifications, which we want to do because they |
|
|
|
|
* might be quite stale. Note that we are not yet listening on anything, |
|
|
|
|
* so we won't deliver such notifications to our frontend. Also, although |
|
|
|
|
* our transaction might have executed NOTIFY, those message(s) aren't |
|
|
|
|
* queued yet so we won't skip them here. |
|
|
|
|
*/ |
|
|
|
|
if (!QUEUE_POS_EQUAL(max, head)) |
|
|
|
|
asyncQueueReadAllNotifications(); |
|
|
|
@ -1938,43 +1936,57 @@ asyncQueueReadAllNotifications(void) |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Get snapshot we'll use to decide which xacts are still in progress */ |
|
|
|
|
snapshot = RegisterSnapshot(GetLatestSnapshot()); |
|
|
|
|
|
|
|
|
|
/*----------
|
|
|
|
|
* Note that we deliver everything that we see in the queue and that |
|
|
|
|
* matches our _current_ listening state. |
|
|
|
|
* Especially we do not take into account different commit times. |
|
|
|
|
* Get snapshot we'll use to decide which xacts are still in progress. |
|
|
|
|
* This is trickier than it might seem, because of race conditions. |
|
|
|
|
* Consider the following example: |
|
|
|
|
* |
|
|
|
|
* Backend 1: Backend 2: |
|
|
|
|
* |
|
|
|
|
* transaction starts |
|
|
|
|
* UPDATE foo SET ...; |
|
|
|
|
* NOTIFY foo; |
|
|
|
|
* commit starts |
|
|
|
|
* queue the notify message |
|
|
|
|
* transaction starts |
|
|
|
|
* LISTEN foo; |
|
|
|
|
* commit starts |
|
|
|
|
* LISTEN foo; -- first LISTEN in session |
|
|
|
|
* SELECT * FROM foo WHERE ...; |
|
|
|
|
* commit to clog |
|
|
|
|
* commit starts |
|
|
|
|
* add backend 2 to array of listeners |
|
|
|
|
* advance to queue head (this code) |
|
|
|
|
* commit to clog |
|
|
|
|
* |
|
|
|
|
* It could happen that backend 2 sees the notification from backend 1 in |
|
|
|
|
* the queue. Even though the notifying transaction committed before |
|
|
|
|
* the listening transaction, we still deliver the notification. |
|
|
|
|
* Transaction 2's SELECT has not seen the UPDATE's effects, since that |
|
|
|
|
* wasn't committed yet. Ideally we'd ensure that client 2 would |
|
|
|
|
* eventually get transaction 1's notify message, but there's no way |
|
|
|
|
* to do that; until we're in the listener array, there's no guarantee |
|
|
|
|
* that the notify message doesn't get removed from the queue. |
|
|
|
|
* |
|
|
|
|
* The idea is that an additional notification does not do any harm, we |
|
|
|
|
* just need to make sure that we do not miss a notification. |
|
|
|
|
* Therefore the coding technique transaction 2 is using is unsafe: |
|
|
|
|
* applications must commit a LISTEN before inspecting database state, |
|
|
|
|
* if they want to ensure they will see notifications about subsequent |
|
|
|
|
* changes to that state. |
|
|
|
|
* |
|
|
|
|
* It is possible that we fail while trying to send a message to our |
|
|
|
|
* frontend (for example, because of encoding conversion failure). |
|
|
|
|
* If that happens it is critical that we not try to send the same |
|
|
|
|
* message over and over again. Therefore, we place a PG_TRY block |
|
|
|
|
* here that will forcibly advance our backend position before we lose |
|
|
|
|
* control to an error. (We could alternatively retake AsyncQueueLock |
|
|
|
|
* and move the position before handling each individual message, but |
|
|
|
|
* that seems like too much lock traffic.) |
|
|
|
|
* What we do guarantee is that we'll see all notifications from |
|
|
|
|
* transactions committing after the snapshot we take here. |
|
|
|
|
* Exec_ListenPreCommit has already added us to the listener array, |
|
|
|
|
* so no not-yet-committed messages can be removed from the queue |
|
|
|
|
* before we see them. |
|
|
|
|
*---------- |
|
|
|
|
*/ |
|
|
|
|
snapshot = RegisterSnapshot(GetLatestSnapshot()); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* It is possible that we fail while trying to send a message to our |
|
|
|
|
* frontend (for example, because of encoding conversion failure). If |
|
|
|
|
* that happens it is critical that we not try to send the same message |
|
|
|
|
* over and over again. Therefore, we place a PG_TRY block here that will |
|
|
|
|
* forcibly advance our queue position before we lose control to an error. |
|
|
|
|
* (We could alternatively retake AsyncQueueLock and move the position |
|
|
|
|
* before handling each individual message, but that seems like too much |
|
|
|
|
* lock traffic.) |
|
|
|
|
*/ |
|
|
|
|
PG_TRY(); |
|
|
|
|
{ |
|
|
|
|
bool reachedStop; |
|
|
|
|