@ -336,21 +336,36 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
}
/*
* Check if all columns referenced in the REPLICA IDENTITY are covered by
* the column list .
* Check for invalid columns in the publication table definition .
*
* Returns true if any replica identity column is not covered by column list .
* This function evaluates two conditions :
*
* 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
* by the column list . If any column is missing , * invalid_column_list is set
* to true .
* 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
* are published either by listing them in the column list or by enabling
* publish_generated_columns option . If any unpublished generated column is
* found , * invalid_gen_col is set to true .
*
* Returns true if any of the above conditions are not met .
*/
bool
pub_collist_contains_invalid_column ( Oid pubid , Relation relation , List * ancestors ,
bool pubviaroot )
pub_contains_invalid_column ( Oid pubid , Relation relation , List * ancestors ,
bool pubviaroot , bool pubgencols ,
bool * invalid_column_list ,
bool * invalid_gen_col )
{
HeapTuple tuple ;
Oid relid = RelationGetRelid ( relation ) ;
Oid publish_as_relid = RelationGetRelid ( relation ) ;
bool result = false ;
Datum datum ;
bool isnull ;
Bitmapset * idattrs ;
Bitmapset * columns = NULL ;
TupleDesc desc = RelationGetDescr ( relation ) ;
Publication * pub ;
int x ;
* invalid_column_list = false ;
* invalid_gen_col = false ;
/*
* For a partition , if pubviaroot is true , find the topmost ancestor that
@ -368,80 +383,91 @@ pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestor
publish_as_relid = relid ;
}
tuple = SearchSysCache2 ( PUBLICATIONRELMAP ,
ObjectIdGetDatum ( publish_as_relid ) ,
ObjectIdGetDatum ( pubid ) ) ;
/* Fetch the column list */
pub = GetPublication ( pubid ) ;
check_and_fetch_column_list ( pub , publish_as_relid , NULL , & columns ) ;
if ( ! HeapTupleIsValid ( tuple ) )
return false ;
if ( relation - > rd_rel - > relreplident = = REPLICA_IDENTITY_FULL )
{
/* With REPLICA IDENTITY FULL, no column list is allowed. */
* invalid_column_list = ( columns ! = NULL ) ;
datum = SysCacheGetAttr ( PUBLICATIONRELMAP , tuple ,
Anum_pg_publication_rel_prattrs ,
& isnull ) ;
/*
* As we don ' t allow a column list with REPLICA IDENTITY FULL , the
* publish_generated_columns option must be set to true if the table
* has any stored generated columns .
*/
if ( ! pubgencols & &
relation - > rd_att - > constr & &
relation - > rd_att - > constr - > has_generated_stored )
* invalid_gen_col = true ;
if ( ! isnull )
{
int x ;
Bitmapset * idattrs ;
Bitmapset * columns = NULL ;
if ( * invalid_gen_col & & * invalid_column_list )
return true ;
}
/* With REPLICA IDENTITY FULL, no column list is allowed. */
if ( relation - > rd_rel - > relreplident = = REPLICA_IDENTITY_FULL )
result = true ;
/* Remember columns that are part of the REPLICA IDENTITY */
idattrs = RelationGetIndexAttrBitmap ( relation ,
INDEX_ATTR_BITMAP_IDENTITY_KEY ) ;
/* Transform the column list datum to a bitmapset. */
columns = pub_collist_to_bitmapset ( NULL , datum , NULL ) ;
/*
* Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
* ( to handle system columns the usual way ) , while column list does not
* use offset , so we can ' t do bms_is_subset ( ) . Instead , we have to loop
* over the idattrs and check all of them are in the list .
*/
x = - 1 ;
while ( ( x = bms_next_member ( idattrs , x ) ) > = 0 )
{
AttrNumber attnum = ( x + FirstLowInvalidHeapAttributeNumber ) ;
Form_pg_attribute att = TupleDescAttr ( desc , attnum - 1 ) ;
/* Remember columns that are part of the REPLICA IDENTITY */
idattrs = RelationGetIndexAttrBitmap ( relation ,
INDEX_ATTR_BITMAP_IDENTITY_KEY ) ;
if ( columns = = NULL )
{
/*
* The publish_generated_columns option must be set to true if the
* REPLICA IDENTITY contains any stored generated column .
*/
if ( ! pubgencols & & att - > attgenerated )
{
* invalid_gen_col = true ;
break ;
}
/* Skip validating the column list since it is not defined */
continue ;
}
/*
* Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
* offset ( to handle system columns the usual way ) , while column list
* does not use offset , so we can ' t do bms_is_subset ( ) . Instead , we
* have to loop over the idattrs and check all of them are in the
* list .
* If pubviaroot is true , we are validating the column list of th e
* parent table , but the bitmap contains the replica identity
* information of the child table . The parent / child attnums may not
* match , so translate them to the parent - get the attname from the
* chi ld , and look it up in the paren t .
*/
x = - 1 ;
while ( ( x = bms_next_member ( idattrs , x ) ) > = 0 )
if ( pubviaroot )
{
AttrNumber attnum = ( x + FirstLowInvalidHeapAttributeNumber ) ;
/* attribute name in the child table */
char * colname = get_attname ( relid , attnum , false ) ;
/*
* If pubviaroot is true , we are validating the column list of the
* parent table , but the bitmap contains the replica identity
* information of the child table . The parent / child attnums may
* not match , so translate them to the parent - get the attname
* from the child , and look it up in the parent .
* Determine the attnum for the attribute name in parent ( we are
* using the column list defined on the parent ) .
*/
if ( pubviaroot )
{
/* attribute name in the child table */
char * colname = get_attname ( relid , attnum , false ) ;
/*
* Determine the attnum for the attribute name in parent ( we
* are using the column list defined on the parent ) .
*/
attnum = get_attnum ( publish_as_relid , colname ) ;
}
/* replica identity column, not covered by the column list */
if ( ! bms_is_member ( attnum , columns ) )
{
result = true ;
break ;
}
attnum = get_attnum ( publish_as_relid , colname ) ;
}
bms_free ( idattrs ) ;
bms_free ( columns ) ;
/* replica identity column, not covered by the column list */
* invalid_column_list | = ! bms_is_member ( attnum , columns ) ;
if ( * invalid_column_list & & * invalid_gen_col )
break ;
}
ReleaseSysCache ( tuple ) ;
bms_free ( columns ) ;
bms_free ( idattrs ) ;
return result ;
return * invalid_column_list | | * invalid_gen_col ;
}
/* check_functions_in_node callback */