|
|
|
@ -972,6 +972,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart |
|
|
|
|
{ |
|
|
|
|
slot->candidate_restart_valid = current_lsn; |
|
|
|
|
slot->candidate_restart_lsn = restart_lsn; |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
|
|
|
|
|
elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", |
|
|
|
|
(uint32) (restart_lsn >> 32), (uint32) restart_lsn, |
|
|
|
@ -979,18 +980,25 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
XLogRecPtr candidate_restart_lsn; |
|
|
|
|
XLogRecPtr candidate_restart_valid; |
|
|
|
|
XLogRecPtr confirmed_flush; |
|
|
|
|
|
|
|
|
|
candidate_restart_lsn = slot->candidate_restart_lsn; |
|
|
|
|
candidate_restart_valid = slot->candidate_restart_valid; |
|
|
|
|
confirmed_flush = slot->data.confirmed_flush; |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
|
|
|
|
|
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X", |
|
|
|
|
(uint32) (restart_lsn >> 32), (uint32) restart_lsn, |
|
|
|
|
(uint32) (current_lsn >> 32), (uint32) current_lsn, |
|
|
|
|
(uint32) (slot->candidate_restart_lsn >> 32), |
|
|
|
|
(uint32) slot->candidate_restart_lsn, |
|
|
|
|
(uint32) (slot->candidate_restart_valid >> 32), |
|
|
|
|
(uint32) slot->candidate_restart_valid, |
|
|
|
|
(uint32) (slot->data.confirmed_flush >> 32), |
|
|
|
|
(uint32) slot->data.confirmed_flush |
|
|
|
|
); |
|
|
|
|
(uint32) (candidate_restart_lsn >> 32), |
|
|
|
|
(uint32) candidate_restart_lsn, |
|
|
|
|
(uint32) (candidate_restart_valid >> 32), |
|
|
|
|
(uint32) candidate_restart_valid, |
|
|
|
|
(uint32) (confirmed_flush >> 32), |
|
|
|
|
(uint32) confirmed_flush); |
|
|
|
|
} |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
|
|
|
|
|
/* candidates are already valid with the current flush position, apply */ |
|
|
|
|
if (updated_lsn) |
|
|
|
|