@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
*/
static void
parse_subscription_options ( List * options , bool * connect , bool * enabled_given ,
bool * enabled , bool * create_slot , char * * slot_name ,
bool * enabled , bool * create_slot ,
bool * slot_name_given , char * * slot_name ,
bool * copy_data , char * * synchronous_commit )
{
ListCell * lc ;
@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
if ( create_slot )
* create_slot = true ;
if ( slot_name )
{
* slot_name_given = false ;
* slot_name = NULL ;
}
if ( copy_data )
* copy_data = true ;
if ( synchronous_commit )
@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
}
else if ( strcmp ( defel - > defname , " slot name " ) = = 0 & & slot_name )
{
if ( * slot_name )
if ( * slot_name_given )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
* slot_name_given = true ;
* slot_name = defGetString ( defel ) ;
/* Setting slot_name = NONE is treated as no slot name. */
if ( strcmp ( * slot_name , " none " ) = = 0 )
* slot_name = NULL ;
}
else if ( strcmp ( defel - > defname , " copy data " ) = = 0 & & copy_data )
{
@ -194,17 +203,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
if ( connect & & ! * connect )
{
/* Check for incompatible options from the user. */
if ( * enabled_given & & * enabled )
if ( enabled & & * enabled_given & & * enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " noconnect and enabled are mutually exclusive options " ) ) ) ;
if ( create_slot_given & & * create_slot )
if ( create_slot & & create_slot _given & & * create_slot )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " noconnect and create slot are mutually exclusive options " ) ) ) ;
if ( copy_data_given & & * copy_data )
if ( copy_data & & copy_data _given & & * copy_data )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " noconnect and copy data are mutually exclusive options " ) ) ) ;
@ -214,6 +223,23 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
* create_slot = false ;
* copy_data = false ;
}
/*
* Do additional checking for disallowed combination when
* slot_name = NONE was used .
*/
if ( slot_name & & * slot_name_given & & ! * slot_name )
{
if ( enabled & & * enabled_given & & * enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " slot_name = NONE and enabled are mutually exclusive options " ) ) ) ;
if ( create_slot & & create_slot_given & & * create_slot )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " slot_name = NONE and create slot are mutually exclusive options " ) ) ) ;
}
}
/*
@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
char * synchronous_commit ;
char * conninfo ;
char * slotname ;
bool slotname_given ;
char originname [ NAMEDATALEN ] ;
bool create_slot ;
List * publications ;
@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here .
*/
parse_subscription_options ( stmt - > options , & connect , & enabled_given ,
& enabled , & create_slot , & slotname , & copy_data ,
& synchronous_commit ) ;
& enabled , & create_slot , & slotname_given ,
& slotname , & copy_data , & s ynchronous_commit ) ;
/*
* Since creating a replication slot is not transactional , rolling back
@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
stmt - > subname ) ) ) ;
}
if ( slotname = = NULL )
if ( ! slotname_given & & slotname = = NULL )
slotname = stmt - > subname ;
/* The default for synchronous_commit of subscriptions is off. */
if ( synchronous_commit = = NULL )
synchronous_commit = " off " ;
@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values [ Anum_pg_subscription_subenabled - 1 ] = BoolGetDatum ( enabled ) ;
values [ Anum_pg_subscription_subconninfo - 1 ] =
CStringGetTextDatum ( conninfo ) ;
values [ Anum_pg_subscription_subslotname - 1 ] =
DirectFunctionCall1 ( namein , CStringGetDatum ( slotname ) ) ;
if ( slotname )
values [ Anum_pg_subscription_subslotname - 1 ] =
DirectFunctionCall1 ( namein , CStringGetDatum ( slotname ) ) ;
else
nulls [ Anum_pg_subscription_subslotname - 1 ] = true ;
values [ Anum_pg_subscription_subsynccommit - 1 ] =
CStringGetTextDatum ( synchronous_commit ) ;
values [ Anum_pg_subscription_subpublications - 1 ] =
@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
*/
if ( create_slot )
{
Assert ( slotname ) ;
walrcv_create_slot ( wrconn , slotname , false ,
CRS_NOEXPORT_SNAPSHOT , & lsn ) ;
ereport ( NOTICE ,
@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
HeapTuple tup ;
Oid subid ;
bool update_tuple = false ;
Subscription * sub ;
rel = heap_open ( SubscriptionRelationId , RowExclusiveLock ) ;
@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
stmt - > subname ) ;
subid = HeapTupleGetOid ( tup ) ;
sub = GetSubscription ( subid , false ) ;
/* Form a new tuple. */
memset ( values , 0 , sizeof ( values ) ) ;
@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
{
case ALTER_SUBSCRIPTION_OPTIONS :
{
char * slot_name ;
char * slotname ;
bool slotname_given ;
char * synchronous_commit ;
parse_subscription_options ( stmt - > options , NULL , NULL , NULL ,
NULL , & slot_name , NULL ,
& synchronous_commit ) ;
NULL , & slotname_given , & slotname ,
NULL , & synchronous_commit ) ;
if ( slot_ name )
if ( slotname_given )
{
values [ Anum_pg_subscription_subslotname - 1 ] =
DirectFunctionCall1 ( namein , CStringGetDatum ( slot_name ) ) ;
if ( sub - > enabled & & ! slotname )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " cannot set slot_name = NONE for enabled subscription " ) ) ) ;
if ( slotname )
values [ Anum_pg_subscription_subslotname - 1 ] =
DirectFunctionCall1 ( namein , CStringGetDatum ( slotname ) ) ;
else
nulls [ Anum_pg_subscription_subslotname - 1 ] = true ;
replaces [ Anum_pg_subscription_subslotname - 1 ] = true ;
}
if ( synchronous_commit )
{
values [ Anum_pg_subscription_subsynccommit - 1 ] =
@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
parse_subscription_options ( stmt - > options , NULL ,
& enabled_given , & enabled , NULL ,
NULL , NULL , NULL ) ;
NULL , NULL , NULL , NULL ) ;
Assert ( enabled_given ) ;
if ( ! sub - > slotname & & enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " cannot enable subscription that does not have a slot name " ) ) ) ;
values [ Anum_pg_subscription_subenabled - 1 ] =
BoolGetDatum ( enabled ) ;
replaces [ Anum_pg_subscription_subenabled - 1 ] = true ;
@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH :
{
bool copy_data ;
Subscription * sub = GetSubscription ( subid , false ) ;
parse_subscription_options ( stmt - > options , NULL , NULL , NULL ,
NULL , NULL , & copy_data , NULL ) ;
NULL , NULL , NULL , & copy_data ,
NULL ) ;
values [ Anum_pg_subscription_subpublications - 1 ] =
publicationListToArray ( stmt - > publication ) ;
@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
/* Refresh if user asked us to. */
if ( stmt - > kind = = ALTER_SUBSCRIPTION_PUBLICATION_REFRESH )
{
if ( ! sub - > enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions " ) ) ) ;
/* Make sure refresh sees the new list of publications. */
sub - > publications = stmt - > publication ;
@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
case ALTER_SUBSCRIPTION_REFRESH :
{
bool copy_data ;
Subscription * sub = GetSubscription ( subid , false ) ;
if ( ! sub - > enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions " ) ) ) ;
parse_subscription_options ( stmt - > options , NULL , NULL , NULL ,
NULL , NULL , & copy_data , NULL ) ;
NULL , NULL , NULL , & copy_data ,
NULL ) ;
AlterSubscription_refresh ( sub , copy_data ) ;
@ -750,15 +810,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
WalReceiverConn * wrconn = NULL ;
StringInfoData cmd ;
/*
* Since dropping a replication slot is not transactional , the replication
* slot stays dropped even if the transaction rolls back . So we cannot
* run DROP SUBSCRIPTION inside a transaction block if dropping the
* replication slot .
*/
if ( stmt - > drop_slot )
PreventTransactionChain ( isTopLevel , " DROP SUBSCRIPTION ... DROP SLOT " ) ;
/*
* Lock pg_subscription with AccessExclusiveLock to ensure
* that the launcher doesn ' t restart new worker during dropping
@ -817,8 +868,24 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* Get slotname */
datum = SysCacheGetAttr ( SUBSCRIPTIONOID , tup ,
Anum_pg_subscription_subslotname , & isnull ) ;
Assert ( ! isnull ) ;
slotname = pstrdup ( NameStr ( * DatumGetName ( datum ) ) ) ;
if ( ! isnull )
slotname = pstrdup ( NameStr ( * DatumGetName ( datum ) ) ) ;
else
slotname = NULL ;
/*
* Since dropping a replication slot is not transactional , the replication
* slot stays dropped even if the transaction rolls back . So we cannot
* run DROP SUBSCRIPTION inside a transaction block if dropping the
* replication slot .
*
* XXX The command name should really be something like " DROP SUBSCRIPTION
* of a subscription that is associated with a replication slot " , but we
* don ' t have the proper facilities for that .
*/
if ( slotname )
PreventTransactionChain ( isTopLevel , " DROP SUBSCRIPTION " ) ;
ObjectAddressSet ( myself , SubscriptionRelationId , subid ) ;
EventTriggerSQLDropAddObject ( & myself , true , true ) ;
@ -843,8 +910,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
if ( originid ! = InvalidRepOriginId )
replorigin_drop ( originid ) ;
/* If the user asked to not drop the slot, we are done mow. */
if ( ! stmt - > drop_slot )
/* If there is no slot associated with the subscription, we can finish here. */
if ( ! slotname )
{
heap_close ( rel , NoLock ) ;
return ;
@ -857,14 +924,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
load_file ( " libpqwalreceiver " , false ) ;
initStringInfo ( & cmd ) ;
appendStringInfo ( & cmd , " DROP_REPLICATION_SLOT \" %s \" " , slotname ) ;
appendStringInfo ( & cmd , " DROP_REPLICATION_SLOT %s " , quote_identifier ( slotname ) ) ;
wrconn = walrcv_connect ( conninfo , true , subname , & err ) ;
if ( wrconn = = NULL )
ereport ( ERROR ,
( errmsg ( " could not connect to publisher when attempting to "
" drop the replication slot \" %s \" " , slotname ) ,
errdetail ( " The error was: %s " , err ) ) ) ;
errdetail ( " The error was: %s " , err ) ,
errhint ( " Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) "
" to disassociate the subscription from the slot. " ) ) ) ;
PG_TRY ( ) ;
{