@ -213,6 +213,67 @@ logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
return - 1 ;
}
/*
* Check if replica identity matches and mark the updatable flag .
*
* We allow for stricter replica identity ( fewer columns ) on subscriber as
* that will not stop us from finding unique tuple . IE , if publisher has
* identity ( id , timestamp ) and subscriber just ( id ) this will not be a
* problem , but in the opposite scenario it will .
*
* We just mark the relation entry as not updatable here if the local
* replica identity is found to be insufficient for applying
* updates / deletes ( inserts don ' t care ! ) and leave it to
* check_relation_updatable ( ) to throw the actual error if needed .
*/
static void
logicalrep_rel_mark_updatable ( LogicalRepRelMapEntry * entry )
{
Bitmapset * idkey ;
LogicalRepRelation * remoterel = & entry - > remoterel ;
int i ;
entry - > updatable = true ;
idkey = RelationGetIndexAttrBitmap ( entry - > localrel ,
INDEX_ATTR_BITMAP_IDENTITY_KEY ) ;
/* fallback to PK if no replica identity */
if ( idkey = = NULL )
{
idkey = RelationGetIndexAttrBitmap ( entry - > localrel ,
INDEX_ATTR_BITMAP_PRIMARY_KEY ) ;
/*
* If no replica identity index and no PK , the published table must
* have replica identity FULL .
*/
if ( idkey = = NULL & & remoterel - > replident ! = REPLICA_IDENTITY_FULL )
entry - > updatable = false ;
}
i = - 1 ;
while ( ( i = bms_next_member ( idkey , i ) ) > = 0 )
{
int attnum = i + FirstLowInvalidHeapAttributeNumber ;
if ( ! AttrNumberIsForUserDefinedAttr ( attnum ) )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication target relation \" %s.%s \" uses "
" system columns in REPLICA IDENTITY index " ,
remoterel - > nspname , remoterel - > relname ) ) ) ;
attnum = AttrNumberGetAttrOffset ( attnum ) ;
if ( entry - > attrmap - > attnums [ attnum ] < 0 | |
! bms_is_member ( entry - > attrmap - > attnums [ attnum ] , remoterel - > attkeys ) )
{
entry - > updatable = false ;
break ;
}
}
}
/*
* Open the local relation associated with the remote one .
*
@ -272,7 +333,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
Oid relid ;
int found ;
Bitmapset * idkey ;
TupleDesc desc ;
MemoryContext oldctx ;
int i ;
@ -332,54 +392,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
remoterel - > nspname , remoterel - > relname ) ) ) ;
/*
* Check that replica identity matches . We allow for stricter replica
* identity ( fewer columns ) on subscriber as that will not stop us
* from finding unique tuple . IE , if publisher has identity
* ( id , timestamp ) and subscriber just ( id ) this will not be a problem ,
* but in the opposite scenario it will .
*
* Don ' t throw any error here just mark the relation entry as not
* updatable , as replica identity is only for updates and deletes but
* inserts can be replicated even without it .
* Set if the table ' s replica identity is enough to apply
* update / delete .
*/
entry - > updatable = true ;
idkey = RelationGetIndexAttrBitmap ( entry - > localrel ,
INDEX_ATTR_BITMAP_IDENTITY_KEY ) ;
/* fallback to PK if no replica identity */
if ( idkey = = NULL )
{
idkey = RelationGetIndexAttrBitmap ( entry - > localrel ,
INDEX_ATTR_BITMAP_PRIMARY_KEY ) ;
/*
* If no replica identity index and no PK , the published table
* must have replica identity FULL .
*/
if ( idkey = = NULL & & remoterel - > replident ! = REPLICA_IDENTITY_FULL )
entry - > updatable = false ;
}
i = - 1 ;
while ( ( i = bms_next_member ( idkey , i ) ) > = 0 )
{
int attnum = i + FirstLowInvalidHeapAttributeNumber ;
if ( ! AttrNumberIsForUserDefinedAttr ( attnum ) )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication target relation \" %s.%s \" uses "
" system columns in REPLICA IDENTITY index " ,
remoterel - > nspname , remoterel - > relname ) ) ) ;
attnum = AttrNumberGetAttrOffset ( attnum ) ;
if ( entry - > attrmap - > attnums [ attnum ] < 0 | |
! bms_is_member ( entry - > attrmap - > attnums [ attnum ] , remoterel - > attkeys ) )
{
entry - > updatable = false ;
break ;
}
}
logicalrep_rel_mark_updatable ( entry ) ;
entry - > localrelvalid = true ;
}
@ -619,7 +635,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
attrmap - > maplen * sizeof ( AttrNumber ) ) ;
}
entry - > updatable = root - > updatable ;
/* Set if the table's replica identity is enough to apply update/delete. */
logicalrep_rel_mark_updatable ( entry ) ;
entry - > localrelvalid = true ;