@ -98,7 +98,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
* Caller is expected to have cleared ' opts ' .
*/
static void
parse_subscription_options ( List * stmt_options , bits32 supported_opts , SubOpts * opts )
parse_subscription_options ( ParseState * pstate , List * stmt_options ,
bits32 supported_opts , SubOpts * opts )
{
ListCell * lc ;
@ -137,9 +138,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " connect " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_CONNECT ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_CONNECT ;
opts - > connect = defGetBoolean ( defel ) ;
@ -148,9 +147,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " enabled " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_ENABLED ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_ENABLED ;
opts - > enabled = defGetBoolean ( defel ) ;
@ -159,9 +156,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " create_slot " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_CREATE_SLOT ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_CREATE_SLOT ;
opts - > create_slot = defGetBoolean ( defel ) ;
@ -170,9 +165,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " slot_name " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_SLOT_NAME ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_SLOT_NAME ;
opts - > slot_name = defGetString ( defel ) ;
@ -185,9 +178,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " copy_data " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_COPY_DATA ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_COPY_DATA ;
opts - > copy_data = defGetBoolean ( defel ) ;
@ -196,9 +187,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " synchronous_commit " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_SYNCHRONOUS_COMMIT ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_SYNCHRONOUS_COMMIT ;
opts - > synchronous_commit = defGetString ( defel ) ;
@ -212,9 +201,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " refresh " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_REFRESH ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_REFRESH ;
opts - > refresh = defGetBoolean ( defel ) ;
@ -223,9 +210,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " binary " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_BINARY ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_BINARY ;
opts - > binary = defGetBoolean ( defel ) ;
@ -234,9 +219,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp ( defel - > defname , " streaming " ) = = 0 )
{
if ( IsSet ( opts - > specified_opts , SUBOPT_STREAMING ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_STREAMING ;
opts - > streaming = defGetBoolean ( defel ) ;
@ -257,9 +240,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
errmsg ( " unrecognized subscription parameter: \" %s \" " , defel - > defname ) ) ) ;
if ( IsSet ( opts - > specified_opts , SUBOPT_TWOPHASE_COMMIT ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
errorConflictingDefElem ( defel , pstate ) ;
opts - > specified_opts | = SUBOPT_TWOPHASE_COMMIT ;
opts - > twophase = defGetBoolean ( defel ) ;
@ -408,7 +389,8 @@ publicationListToArray(List *publist)
* Create new subscription .
*/
ObjectAddress
CreateSubscription ( CreateSubscriptionStmt * stmt , bool isTopLevel )
CreateSubscription ( ParseState * pstate , CreateSubscriptionStmt * stmt ,
bool isTopLevel )
{
Relation rel ;
ObjectAddress myself ;
@ -432,7 +414,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT ) ;
parse_subscription_options ( stmt - > options , supported_opts , & opts ) ;
parse_subscription_options ( pstate , stmt - > options , supported_opts , & opts ) ;
/*
* Since creating a replication slot is not transactional , rolling back
@ -853,7 +835,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
* Alter the existing subscription .
*/
ObjectAddress
AlterSubscription ( AlterSubscriptionStmt * stmt , bool isTopLevel )
AlterSubscription ( ParseState * pstate , AlterSubscriptionStmt * stmt ,
bool isTopLevel )
{
Relation rel ;
ObjectAddress myself ;
@ -906,7 +889,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING ) ;
parse_subscription_options ( stmt - > options , supported_opts , & opts ) ;
parse_subscription_options ( pstate , stmt - > options ,
supported_opts , & opts ) ;
if ( IsSet ( opts . specified_opts , SUBOPT_SLOT_NAME ) )
{
@ -957,7 +941,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_ENABLED :
{
parse_subscription_options ( stmt - > options , SUBOPT_ENABLED , & opts ) ;
parse_subscription_options ( pstate , stmt - > options ,
SUBOPT_ENABLED , & opts ) ;
Assert ( IsSet ( opts . specified_opts , SUBOPT_ENABLED ) ) ;
if ( ! sub - > slotname & & opts . enabled )
@ -991,7 +976,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_SET_PUBLICATION :
{
supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH ;
parse_subscription_options ( stmt - > options , supported_opts , & opts ) ;
parse_subscription_options ( pstate , stmt - > options ,
supported_opts , & opts ) ;
values [ Anum_pg_subscription_subpublications - 1 ] =
publicationListToArray ( stmt - > publication ) ;
@ -1040,7 +1026,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if ( isadd )
supported_opts | = SUBOPT_COPY_DATA ;
parse_subscription_options ( stmt - > options , supported_opts , & opts ) ;
parse_subscription_options ( pstate , stmt - > options ,
supported_opts , & opts ) ;
publist = merge_publications ( sub - > publications , stmt - > publication , isadd , stmt - > subname ) ;
values [ Anum_pg_subscription_subpublications - 1 ] =
@ -1087,7 +1074,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions " ) ) ) ;
parse_subscription_options ( stmt - > options , SUBOPT_COPY_DATA , & opts ) ;
parse_subscription_options ( pstate , stmt - > options ,
SUBOPT_COPY_DATA , & opts ) ;
/*
* The subscription option " two_phase " requires that