@ -400,12 +400,15 @@ static void apply_handle_insert_internal(ApplyExecutionData *edata,
static void apply_handle_update_internal ( ApplyExecutionData * edata ,
static void apply_handle_update_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
ResultRelInfo * relinfo ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
LogicalRepTupleData * newtup ) ;
LogicalRepTupleData * newtup ,
Oid localindexoid ) ;
static void apply_handle_delete_internal ( ApplyExecutionData * edata ,
static void apply_handle_delete_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
ResultRelInfo * relinfo ,
TupleTableSlot * remoteslot ) ;
TupleTableSlot * remoteslot ,
Oid localindexoid ) ;
static bool FindReplTupleInLocalRel ( EState * estate , Relation localrel ,
static bool FindReplTupleInLocalRel ( EState * estate , Relation localrel ,
LogicalRepRelation * remoterel ,
LogicalRepRelation * remoterel ,
Oid localidxoid ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
TupleTableSlot * * localslot ) ;
TupleTableSlot * * localslot ) ;
static void apply_handle_tuple_routing ( ApplyExecutionData * edata ,
static void apply_handle_tuple_routing ( ApplyExecutionData * edata ,
@ -2350,24 +2353,6 @@ apply_handle_type(StringInfo s)
logicalrep_read_typ ( s , & typ ) ;
logicalrep_read_typ ( s , & typ ) ;
}
}
/*
* Get replica identity index or if it is not defined a primary key .
*
* If neither is defined , returns InvalidOid
*/
static Oid
GetRelationIdentityOrPK ( Relation rel )
{
Oid idxoid ;
idxoid = RelationGetReplicaIndex ( rel ) ;
if ( ! OidIsValid ( idxoid ) )
idxoid = RelationGetPrimaryKeyIndex ( rel ) ;
return idxoid ;
}
/*
/*
* Check that we ( the subscription owner ) have sufficient privileges on the
* Check that we ( the subscription owner ) have sufficient privileges on the
* target relation to perform the given operation .
* target relation to perform the given operation .
@ -2627,7 +2612,7 @@ apply_handle_update(StringInfo s)
remoteslot , & newtup , CMD_UPDATE ) ;
remoteslot , & newtup , CMD_UPDATE ) ;
else
else
apply_handle_update_internal ( edata , edata - > targetRelInfo ,
apply_handle_update_internal ( edata , edata - > targetRelInfo ,
remoteslot , & newtup ) ;
remoteslot , & newtup , rel - > localindexoid ) ;
finish_edata ( edata ) ;
finish_edata ( edata ) ;
@ -2648,7 +2633,8 @@ static void
apply_handle_update_internal ( ApplyExecutionData * edata ,
apply_handle_update_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
ResultRelInfo * relinfo ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
LogicalRepTupleData * newtup )
LogicalRepTupleData * newtup ,
Oid localindexoid )
{
{
EState * estate = edata - > estate ;
EState * estate = edata - > estate ;
LogicalRepRelMapEntry * relmapentry = edata - > targetRel ;
LogicalRepRelMapEntry * relmapentry = edata - > targetRel ;
@ -2663,6 +2649,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel ( estate , localrel ,
found = FindReplTupleInLocalRel ( estate , localrel ,
& relmapentry - > remoterel ,
& relmapentry - > remoterel ,
localindexoid ,
remoteslot , & localslot ) ;
remoteslot , & localslot ) ;
ExecClearTuple ( remoteslot ) ;
ExecClearTuple ( remoteslot ) ;
@ -2767,7 +2754,7 @@ apply_handle_delete(StringInfo s)
remoteslot , NULL , CMD_DELETE ) ;
remoteslot , NULL , CMD_DELETE ) ;
else
else
apply_handle_delete_internal ( edata , edata - > targetRelInfo ,
apply_handle_delete_internal ( edata , edata - > targetRelInfo ,
remoteslot ) ;
remoteslot , rel - > localindexoid ) ;
finish_edata ( edata ) ;
finish_edata ( edata ) ;
@ -2787,7 +2774,8 @@ apply_handle_delete(StringInfo s)
static void
static void
apply_handle_delete_internal ( ApplyExecutionData * edata ,
apply_handle_delete_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
ResultRelInfo * relinfo ,
TupleTableSlot * remoteslot )
TupleTableSlot * remoteslot ,
Oid localindexoid )
{
{
EState * estate = edata - > estate ;
EState * estate = edata - > estate ;
Relation localrel = relinfo - > ri_RelationDesc ;
Relation localrel = relinfo - > ri_RelationDesc ;
@ -2799,7 +2787,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
EvalPlanQualInit ( & epqstate , estate , NULL , NIL , - 1 ) ;
EvalPlanQualInit ( & epqstate , estate , NULL , NIL , - 1 ) ;
ExecOpenIndices ( relinfo , false ) ;
ExecOpenIndices ( relinfo , false ) ;
found = FindReplTupleInLocalRel ( estate , localrel , remoterel ,
found = FindReplTupleInLocalRel ( estate , localrel , remoterel , localindexoid ,
remoteslot , & localslot ) ;
remoteslot , & localslot ) ;
/* If found delete it. */
/* If found delete it. */
@ -2833,17 +2821,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
/*
/*
* Try to find a tuple received from the publication side ( in ' remoteslot ' ) in
* Try to find a tuple received from the publication side ( in ' remoteslot ' ) in
* the corresponding local relation using either replica identity index ,
* the corresponding local relation using either replica identity index ,
* primary key or if needed , sequential scan .
* primary key , index or if needed , sequential scan .
*
*
* Local tuple , if found , is returned in ' * localslot ' .
* Local tuple , if found , is returned in ' * localslot ' .
*/
*/
static bool
static bool
FindReplTupleInLocalRel ( EState * estate , Relation localrel ,
FindReplTupleInLocalRel ( EState * estate , Relation localrel ,
LogicalRepRelation * remoterel ,
LogicalRepRelation * remoterel ,
Oid localidxoid ,
TupleTableSlot * remoteslot ,
TupleTableSlot * remoteslot ,
TupleTableSlot * * localslot )
TupleTableSlot * * localslot )
{
{
Oid idxoid ;
bool found ;
bool found ;
/*
/*
@ -2854,12 +2842,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
* localslot = table_slot_create ( localrel , & estate - > es_tupleTable ) ;
* localslot = table_slot_create ( localrel , & estate - > es_tupleTable ) ;
idxoid = GetRelationIdentityOrPK ( localrel ) ;
Assert ( OidIsValid ( localidxoid ) | |
Assert ( OidIsValid ( idxoid ) | |
( remoterel - > replident = = REPLICA_IDENTITY_FULL ) ) ;
( remoterel - > replident = = REPLICA_IDENTITY_FULL ) ) ;
if ( OidIsValid ( idxoid ) )
if ( OidIsValid ( local idxoid) )
found = RelationFindReplTupleByIndex ( localrel , idxoid ,
found = RelationFindReplTupleByIndex ( localrel , local idxoid,
LockTupleExclusive ,
LockTupleExclusive ,
remoteslot , * localslot ) ;
remoteslot , * localslot ) ;
else
else
@ -2960,7 +2947,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
case CMD_DELETE :
case CMD_DELETE :
apply_handle_delete_internal ( edata , partrelinfo ,
apply_handle_delete_internal ( edata , partrelinfo ,
remoteslot_part ) ;
remoteslot_part ,
part_entry - > localindexoid ) ;
break ;
break ;
case CMD_UPDATE :
case CMD_UPDATE :
@ -2980,6 +2968,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Get the matching local tuple from the partition. */
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel ( estate , partrel ,
found = FindReplTupleInLocalRel ( estate , partrel ,
& part_entry - > remoterel ,
& part_entry - > remoterel ,
part_entry - > localindexoid ,
remoteslot_part , & localslot ) ;
remoteslot_part , & localslot ) ;
if ( ! found )
if ( ! found )
{
{
@ -3076,7 +3065,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
/* DELETE old tuple found in the old partition. */
/* DELETE old tuple found in the old partition. */
apply_handle_delete_internal ( edata , partrelinfo ,
apply_handle_delete_internal ( edata , partrelinfo ,
localslot ) ;
localslot ,
part_entry - > localindexoid ) ;
/* INSERT new tuple into the new partition. */
/* INSERT new tuple into the new partition. */