@ -138,9 +138,9 @@
* Each apply worker that enabled retain_dead_tuples option maintains a
* Each apply worker that enabled retain_dead_tuples option maintains a
* non - removable transaction ID ( oldest_nonremovable_xid ) in shared memory to
* non - removable transaction ID ( oldest_nonremovable_xid ) in shared memory to
* prevent dead rows from being removed prematurely when the apply worker still
* prevent dead rows from being removed prematurely when the apply worker still
* needs them to detect conflicts reliably . This helps to retain the required
* needs them to detect update_deleted conflicts . Additionally , this helps to
* commit_ts module information , which further helps to detect
* retain the required commit_ts module information , which further helps to
* update_origin_differs and delete_origin_differs conflicts reliably , as
* detect update_origin_differs and delete_origin_differs conflicts reliably , as
* otherwise , vacuum freeze could remove the required information .
* otherwise , vacuum freeze could remove the required information .
*
*
* The logical replication launcher manages an internal replication slot named
* The logical replication launcher manages an internal replication slot named
@ -185,10 +185,10 @@
* transactions that occurred concurrently with the tuple DELETE , any
* transactions that occurred concurrently with the tuple DELETE , any
* subsequent UPDATE from a remote node should have a later timestamp . In such
* subsequent UPDATE from a remote node should have a later timestamp . In such
* cases , it is acceptable to detect an update_missing scenario and convert the
* cases , it is acceptable to detect an update_missing scenario and convert the
* UPDATE to an INSERT when applying it . But , detecting concurrent remote
* UPDATE to an INSERT when applying it . But , for concurrent remote
* transactions with earlier timestamps than the DELETE is necessary , as the
* transactions with earlier timestamps than the DELETE , detecting
* UPDATEs in remote transactions should be ignored if their timestamp is
* update_deleted is necessary , as the UPDATEs in remote transactions should be
* earlier than that of the dead tuples .
* ignored if their timestamp is earlier than that of the dead tuples .
*
*
* Note that advancing the non - removable transaction ID is not supported if the
* Note that advancing the non - removable transaction ID is not supported if the
* publisher is also a physical standby . This is because the logical walsender
* publisher is also a physical standby . This is because the logical walsender
@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
Oid localidxoid ,
Oid localidxoid ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
TupleTableSlot * * localslot ) ;
TupleTableSlot * * localslot ) ;
static bool FindDeletedTupleInLocalRel ( Relation localrel ,
Oid localidxoid ,
TupleTableSlot * remoteslot ,
TransactionId * delete_xid ,
RepOriginId * delete_origin ,
TimestampTz * delete_time ) ;
static void apply_handle_tuple_routing ( ApplyExecutionData * edata ,
static void apply_handle_tuple_routing ( ApplyExecutionData * edata ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
LogicalRepTupleData * newtup ,
LogicalRepTupleData * newtup ,
@ -2912,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata,
}
}
else
else
{
{
ConflictType type ;
TupleTableSlot * newslot = localslot ;
TupleTableSlot * newslot = localslot ;
/*
* Detecting whether the tuple was recently deleted or never existed
* is crucial to avoid misleading the user during confict handling .
*/
if ( FindDeletedTupleInLocalRel ( localrel , localindexoid , remoteslot ,
& conflicttuple . xmin ,
& conflicttuple . origin ,
& conflicttuple . ts ) & &
conflicttuple . origin ! = replorigin_session_origin )
type = CT_UPDATE_DELETED ;
else
type = CT_UPDATE_MISSING ;
/* Store the new tuple for conflict reporting */
/* Store the new tuple for conflict reporting */
slot_store_data ( newslot , relmapentry , newtup ) ;
slot_store_data ( newslot , relmapentry , newtup ) ;
/*
/*
* The tuple to be updated could not be found . Do nothing except for
* The tuple to be updated could not be found or was deleted . Do
* emitting a log message .
* nothing except for emitting a log message .
*/
*/
ReportApplyConflict ( estate , relinfo , LOG , CT_UPDATE_MISSING ,
ReportApplyConflict ( estate , relinfo , LOG , type , remoteslot , newslot ,
remoteslot , newslot , list_make1 ( & conflicttuple ) ) ;
list_make1 ( & conflicttuple ) ) ;
}
}
/* Cleanup. */
/* Cleanup. */
@ -3142,6 +3162,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
return found ;
return found ;
}
}
/*
* Determine whether the index can reliably locate the deleted tuple in the
* local relation .
*
* An index may exclude deleted tuples if it was re - indexed or re - created during
* change application . Therefore , an index is considered usable only if the
* conflict detection slot . xmin ( conflict_detection_xmin ) is greater than the
* index tuple ' s xmin . This ensures that any tuples deleted prior to the index
* creation or re - indexing are not relevant for conflict detection in the
* current apply worker .
*
* Note that indexes may also be excluded if they were modified by other DDL
* operations , such as ALTER INDEX . However , this is acceptable , as the
* likelihood of such DDL changes coinciding with the need to scan dead
* tuples for the update_deleted is low .
*/
static bool
IsIndexUsableForFindingDeletedTuple ( Oid localindexoid ,
TransactionId conflict_detection_xmin )
{
HeapTuple index_tuple ;
TransactionId index_xmin ;
index_tuple = SearchSysCache1 ( INDEXRELID , ObjectIdGetDatum ( localindexoid ) ) ;
if ( ! HeapTupleIsValid ( index_tuple ) ) /* should not happen */
elog ( ERROR , " cache lookup failed for index %u " , localindexoid ) ;
/*
* No need to check for a frozen transaction ID , as
* TransactionIdPrecedes ( ) manages it internally , treating it as falling
* behind the conflict_detection_xmin .
*/
index_xmin = HeapTupleHeaderGetXmin ( index_tuple - > t_data ) ;
ReleaseSysCache ( index_tuple ) ;
return TransactionIdPrecedes ( index_xmin , conflict_detection_xmin ) ;
}
/*
* Attempts to locate a deleted tuple in the local relation that matches the
* values of the tuple received from the publication side ( in ' remoteslot ' ) .
* The search is performed using either the replica identity index , primary
* key , other available index , or a sequential scan if necessary .
*
* Returns true if the deleted tuple is found . If found , the transaction ID ,
* origin , and commit timestamp of the deletion are stored in ' * delete_xid ' ,
* ' * delete_origin ' , and ' * delete_time ' respectively .
*/
static bool
FindDeletedTupleInLocalRel ( Relation localrel , Oid localidxoid ,
TupleTableSlot * remoteslot ,
TransactionId * delete_xid , RepOriginId * delete_origin ,
TimestampTz * delete_time )
{
TransactionId oldestxmin ;
ReplicationSlot * slot ;
/*
* Return false if either dead tuples are not retained or commit timestamp
* data is not available .
*/
if ( ! MySubscription - > retaindeadtuples | | ! track_commit_timestamp )
return false ;
/*
* For conflict detection , we use the conflict slot ' s xmin value instead
* of invoking GetOldestNonRemovableTransactionId ( ) . The slot . xmin acts as
* a threshold to identify tuples that were recently deleted . These tuples
* are not visible to concurrent transactions , but we log an
* update_deleted conflict if such a tuple matches the remote update being
* applied .
*
* Although GetOldestNonRemovableTransactionId ( ) can return a value older
* than the slot ' s xmin , for our current purpose it is acceptable to treat
* tuples deleted by transactions prior to slot . xmin as update_missing
* conflicts .
*
* Ideally , we would use oldest_nonremovable_xid , which is directly
* maintained by the leader apply worker . However , this value is not
* available to table synchronization or parallel apply workers , making
* slot . xmin a practical alternative in those contexts .
*/
slot = SearchNamedReplicationSlot ( CONFLICT_DETECTION_SLOT , true ) ;
Assert ( slot ) ;
SpinLockAcquire ( & slot - > mutex ) ;
oldestxmin = slot - > data . xmin ;
SpinLockRelease ( & slot - > mutex ) ;
Assert ( TransactionIdIsValid ( oldestxmin ) ) ;
if ( OidIsValid ( localidxoid ) & &
IsIndexUsableForFindingDeletedTuple ( localidxoid , oldestxmin ) )
return RelationFindDeletedTupleInfoByIndex ( localrel , localidxoid ,
remoteslot , oldestxmin ,
delete_xid , delete_origin ,
delete_time ) ;
else
return RelationFindDeletedTupleInfoSeq ( localrel , remoteslot ,
oldestxmin , delete_xid ,
delete_origin , delete_time ) ;
}
/*
/*
* This handles insert , update , delete on a partitioned table .
* This handles insert , update , delete on a partitioned table .
*/
*/
@ -3260,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part , & localslot ) ;
remoteslot_part , & localslot ) ;
if ( ! found )
if ( ! found )
{
{
ConflictType type ;
TupleTableSlot * newslot = localslot ;
TupleTableSlot * newslot = localslot ;
/*
* Detecting whether the tuple was recently deleted or
* never existed is crucial to avoid misleading the user
* during confict handling .
*/
if ( FindDeletedTupleInLocalRel ( partrel ,
part_entry - > localindexoid ,
remoteslot_part ,
& conflicttuple . xmin ,
& conflicttuple . origin ,
& conflicttuple . ts ) & &
conflicttuple . origin ! = replorigin_session_origin )
type = CT_UPDATE_DELETED ;
else
type = CT_UPDATE_MISSING ;
/* Store the new tuple for conflict reporting */
/* Store the new tuple for conflict reporting */
slot_store_data ( newslot , part_entry , newtup ) ;
slot_store_data ( newslot , part_entry , newtup ) ;
/*
/*
* The tuple to be updated could not be found . Do nothing
* The tuple to be updated could not be found or was
* except for emitting a log message .
* deleted . Do nothing except for emitting a log message .
*/
*/
ReportApplyConflict ( estate , partrelinfo , LOG ,
ReportApplyConflict ( estate , partrelinfo , LOG ,
CT_UPDATE_MISSING , remoteslot_part ,
type , remoteslot_part , newslo t ,
newslot , list_make1 ( & conflicttuple ) ) ;
list_make1 ( & conflicttuple ) ) ;
return ;
return ;
}
}
@ -4172,8 +4315,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
{
{
/*
/*
* It is sufficient to manage non - removable transaction ID for a
* It is sufficient to manage non - removable transaction ID for a
* subscription by the main apply worker to detect conflicts reliably even
* subscription by the main apply worker to detect update_deleted reliably
* for table sync or parallel apply workers .
* even for table sync or parallel apply workers .
*/
*/
if ( ! am_leader_apply_worker ( ) )
if ( ! am_leader_apply_worker ( ) )
return false ;
return false ;
@ -4374,10 +4517,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* We expect the publisher and subscriber clocks to be in sync using time
* We expect the publisher and subscriber clocks to be in sync using time
* sync service like NTP . Otherwise , we will advance this worker ' s
* sync service like NTP . Otherwise , we will advance this worker ' s
* oldest_nonremovable_xid prematurely , leading to the removal of rows
* oldest_nonremovable_xid prematurely , leading to the removal of rows
* required to detect conflicts reliably . This check primarily addresses
* required to detect update_deleted reliably . This check primarily
* scenarios where the publisher ' s clock falls behind ; if the publisher ' s
* addresses scenarios where the publisher ' s clock falls behind ; if the
* clock is ahead , subsequent transactions will naturally bear later
* publisher ' s clock is ahead , subsequent transactions will naturally bear
* commit timestamps , conforming to the design outlined atop worker . c .
* later commit timestamps , conforming to the design outlined atop
* worker . c .
*
*
* XXX Consider waiting for the publisher ' s clock to catch up with the
* XXX Consider waiting for the publisher ' s clock to catch up with the
* subscriber ' s before proceeding to the next phase .
* subscriber ' s before proceeding to the next phase .