@ -53,6 +53,7 @@
# include "utils/syscache.h"
# include "utils/varlena.h"
/*
* Information used to validate the columns in the row filter expression . See
* contain_invalid_rfcolumn_walker for details .
@ -76,6 +77,7 @@ static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
AlterPublicationStmt * stmt ) ;
static void PublicationDropSchemas ( Oid pubid , List * schemas , bool missing_ok ) ;
static void
parse_publication_options ( ParseState * pstate ,
List * options ,
@ -125,7 +127,8 @@ parse_publication_options(ParseState *pstate,
if ( ! SplitIdentifierString ( publish , ' , ' , & publish_list ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " invalid list syntax for \" publish \" option " ) ) ) ;
errmsg ( " invalid list syntax in parameter \" %s \" " ,
" publish " ) ) ) ;
/* Process the option list. */
foreach ( lc , publish_list )
@ -143,7 +146,8 @@ parse_publication_options(ParseState *pstate,
else
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " unrecognized \" publish \" value: \" %s \" " , publish_opt ) ) ) ;
errmsg ( " unrecognized value for publication option \" %s \" : \" %s \" " ,
" publish " , publish_opt ) ) ) ;
}
}
else if ( strcmp ( defel - > defname , " publish_via_partition_root " ) = = 0 )
@ -444,10 +448,12 @@ contain_mutable_or_user_functions_checker(Oid func_id, void *context)
}
/*
* Check if the node contains any unallowed object . See
* Check if the node contains any disallowed object . Subroutine for
* check_simple_rowfilter_expr_walker .
*
* Returns the error detail message in errdetail_msg for unallowed expressions .
* If a disallowed object is found , * errdetail_msg is set to a ( possibly
* translated ) message to use as errdetail . If none , * errdetail_msg is not
* modified .
*/
static void
expr_allowed_in_node ( Node * node , ParseState * pstate , char * * errdetail_msg )
@ -678,10 +684,17 @@ TransformPubWhereClauses(List *tables, const char *queryString,
/*
* Check the publication column lists expression for all relations in the list .
* Given a list of tables that are going to be added to a publication ,
* verify that they fulfill the necessary preconditions , namely : no tables
* have a column list if any schema is published ; and partitioned tables do
* not have column lists if publish_via_partition_root is not set .
*
* ' publish_schema ' indicates that the publication contains any TABLES IN
* SCHEMA elements ( newly added in this command , or preexisting ) .
* ' pubviaroot ' is the value of publish_via_partition_root .
*/
static void
CheckPubRelationColumnList ( List * tables , const char * queryString ,
CheckPubRelationColumnList ( char * pubname , List * tables ,
bool publish_schema , bool pubviaroot )
{
ListCell * lc ;
@ -706,23 +719,24 @@ CheckPubRelationColumnList(List *tables, const char *queryString,
if ( publish_schema )
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot use publication column list for relation \" %s.%s \" " ,
errmsg ( " cannot use column list for relation \" %s.%s \" in publication \" %s \" " ,
get_namespace_name ( RelationGetNamespace ( pri - > relation ) ) ,
RelationGetRelationName ( pri - > relation ) ) ,
errdetail ( " Column list cannot be specified if any schema is part of the publication or specified in the list . " ) ) ;
RelationGetRelationName ( pri - > relation ) , pubname ) ,
errdetail ( " Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements . " ) ) ;
/*
* If the publication doesn ' t publish changes via the root partitioned
* table , the partition ' s column list will be used . So disallow using
* the column list on partitioned table in this case .
* a column list on the partitioned table in this case .
*/
if ( ! pubviaroot & &
pri - > relation - > rd_rel - > relkind = = RELKIND_PARTITIONED_TABLE )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot use publication column list for relation \" %s \" " ,
RelationGetRelationName ( pri - > relation ) ) ,
errdetail ( " Column list cannot be used for a partitioned table when %s is false. " ,
errmsg ( " cannot use column list for relation \" %s.%s \" in publication \" %s \" " ,
get_namespace_name ( RelationGetNamespace ( pri - > relation ) ) ,
RelationGetRelationName ( pri - > relation ) , pubname ) ,
errdetail ( " Column lists cannot be specified for partitioned tables when %s is false. " ,
" publish_via_partition_root " ) ) ) ;
}
}
@ -765,12 +779,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
puboid = GetSysCacheOid1 ( PUBLICATIONNAME , Anum_pg_publication_oid ,
CStringGetDatum ( stmt - > pubname ) ) ;
if ( OidIsValid ( puboid ) )
{
ereport ( ERROR ,
( errcode ( ERRCODE_DUPLICATE_OBJECT ) ,
errmsg ( " publication \" %s \" already exists " ,
stmt - > pubname ) ) ) ;
}
/* Form a tuple. */
memset ( values , 0 , sizeof ( values ) ) ;
@ -840,7 +852,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
TransformPubWhereClauses ( rels , pstate - > p_sourcetext ,
publish_via_partition_root ) ;
CheckPubRelationColumnList ( rels , pstate - > p_sourcetext ,
CheckPubRelationColumnList ( stmt - > pubname , rels ,
schemaidlist ! = NIL ,
publish_via_partition_root ) ;
@ -864,12 +876,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
InvokeObjectPostCreateHook ( PublicationRelationId , puboid , 0 ) ;
if ( wal_level ! = WAL_LEVEL_LOGICAL )
{
ereport ( WARNING ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " wal_level is insufficient to publish logical changes " ) ,
errhint ( " Set wal_level to logical before creating subscriptions. " ) ) ) ;
}
errhint ( " Set wal_level to \" logical \" before creating subscriptions. " ) ) ) ;
return myself ;
}
@ -1110,7 +1120,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
publish_schema | = is_schema_publication ( pubid ) ;
CheckPubRelationColumnList ( rels , queryString , publish_schema ,
CheckPubRelationColumnList ( stmt - > pubname , rels , publish_schema ,
pubform - > pubviaroot ) ;
PublicationAddTables ( pubid , rels , false , stmt ) ;
@ -1126,7 +1136,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
TransformPubWhereClauses ( rels , queryString , pubform - > pubviaroot ) ;
CheckPubRelationColumnList ( rels , queryString , publish_schema ,
CheckPubRelationColumnList ( stmt - > pubname , rels , publish_schema ,
pubform - > pubviaroot ) ;
/*
@ -1299,8 +1309,9 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
if ( ! heap_attisnull ( coltuple , Anum_pg_publication_rel_prattrs , NULL ) )
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot add schema to the publication " ) ,
errdetail ( " Schema cannot be added if any table that specifies column list is already part of the publication. " ) ) ;
errmsg ( " cannot add schema to publication \" %s \" " ,
stmt - > pubname ) ,
errdetail ( " Schemas cannot be added if any tables that specify a column list are already part of the publication. " ) ) ;
ReleaseSysCache ( coltuple ) ;
}