@ -16,6 +16,7 @@
# include "access/htup_details.h"
# include "access/table.h"
# include "access/twophase.h"
# include "access/xact.h"
# include "catalog/catalog.h"
# include "catalog/dependency.h"
@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
static void check_duplicates_in_publist ( List * publist , Datum * datums ) ;
static List * merge_publications ( List * oldpublist , List * newpublist , bool addpub , const char * subname ) ;
static void ReportSlotConnectionError ( List * rstates , Oid subid , char * slotname , char * err ) ;
static void CheckAlterSubOption ( Subscription * sub , const char * option ,
bool slot_needs_update , bool isTopLevel ) ;
/*
@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts - > specified_opts | = SUBOPT_STREAMING ;
opts - > streaming = defGetStreamingMode ( defel ) ;
}
else if ( strcmp ( defel - > defname , " two_phase " ) = = 0 )
else if ( IsSet ( supported_opts , SUBOPT_TWOPHASE_COMMIT ) & &
strcmp ( defel - > defname , " two_phase " ) = = 0 )
{
/*
* Do not allow toggling of two_phase option . Doing so could cause
* missing of transactions and lead to an inconsistent replica .
* See comments atop worker . c
*
* Note : Unsupported twophase indicates that this call originated
* from AlterSubscription .
*/
if ( ! IsSet ( supported_opts , SUBOPT_TWOPHASE_COMMIT ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " unrecognized subscription parameter: \" %s \" " , defel - > defname ) ) ) ;
if ( IsSet ( opts - > specified_opts , SUBOPT_TWOPHASE_COMMIT ) )
errorConflictingDefElem ( defel , pstate ) ;
@ -1079,6 +1070,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
table_close ( rel , NoLock ) ;
}
/*
* Common checks for altering failover and two_phase options .
*/
static void
CheckAlterSubOption ( Subscription * sub , const char * option ,
bool slot_needs_update , bool isTopLevel )
{
/*
* The checks in this function are required only for failover and
* two_phase options .
*/
Assert ( strcmp ( option , " failover " ) = = 0 | |
strcmp ( option , " two_phase " ) = = 0 ) ;
/*
* Do not allow changing the option if the subscription is enabled . This
* is because both failover and two_phase options of the slot on the
* publisher cannot be modified if the slot is currently acquired by the
* existing walsender .
*
* Note that two_phase is enabled ( aka changed from ' false ' to ' true ' ) on
* the publisher by the existing walsender , so we could have allowed that
* even when the subscription is enabled . But we kept this restriction for
* the sake of consistency and simplicity .
*/
if ( sub - > enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot set %s for enabled subscription " ,
option ) ) ) ;
if ( slot_needs_update )
{
StringInfoData cmd ;
/*
* A valid slot must be associated with the subscription for us to
* modify any of the slot ' s properties .
*/
if ( ! sub - > slotname )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot set %s for a subscription that does not have a slot name " ,
option ) ) ) ;
/* The changed option of the slot can't be rolled back. */
initStringInfo ( & cmd ) ;
appendStringInfo ( & cmd , " ALTER SUBSCRIPTION ... SET (%s) " , option ) ;
PreventInTransactionBlock ( isTopLevel , cmd . data ) ;
pfree ( cmd . data ) ;
}
}
/*
* Alter the existing subscription .
*/
@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
HeapTuple tup ;
Oid subid ;
bool update_tuple = false ;
bool update_failover = false ;
bool update_two_phase = false ;
Subscription * sub ;
Form_pg_subscription form ;
bits32 supported_opts ;
@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = ( SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_ORIGIN ) ;
@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces [ Anum_pg_subscription_subrunasowner - 1 ] = true ;
}
if ( IsSet ( opts . specified_opts , SUBOPT_FAILOVER ) )
if ( IsSet ( opts . specified_opts , SUBOPT_TWOPHASE_COMMIT ) )
{
if ( ! sub - > slotname )
/*
* We need to update both the slot and the subscription
* for the two_phase option . We can enable the two_phase
* option for a slot only once the initial data
* synchronization is done . This is to avoid missing some
* data as explained in comments atop worker . c .
*/
update_two_phase = ! opts . twophase ;
CheckAlterSubOption ( sub , " two_phase " , update_two_phase ,
isTopLevel ) ;
/*
* Modifying the two_phase slot option requires a slot
* lookup by slot name , so changing the slot name at the
* same time is not allowed .
*/
if ( update_two_phase & &
IsSet ( opts . specified_opts , SUBOPT_SLOT_NAME ) )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " slot_name and two_phase cannot be altered at the same time " ) ) ) ;
/*
* Note that workers may still survive even if the
* subscription has been disabled .
*
* Ensure workers have already been exited to avoid
* getting prepared transactions while we are disabling
* the two_phase option . Otherwise , the changes of an
* already prepared transaction can be replicated again
* along with its corresponding commit , leading to
* duplicate data or errors .
*/
if ( logicalrep_workers_find ( subid , true , true ) )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot set %s for a subscription that does not have a slot name " ,
" failover " ) ) ) ;
errmsg ( " cannot alter two_phase when logical replication worker is still running " ) ,
errhint ( " Try again after some time. " ) ) ) ;
/*
* Do not allow changing the failover state if the
* subscription is enabled . This is because the failover
* state of the slot on the publisher cannot be modified
* if the slot is currently acquired by the apply worker .
* two_phase cannot be disabled if there are any
* uncommitted prepared transactions present otherwise it
* can lead to duplicate data or errors as explained in
* the comment above .
*/
if ( sub - > enabled )
if ( update_two_phase & &
sub - > twophasestate = = LOGICALREP_TWOPHASE_STATE_ENABLED & &
LookupGXactBySubid ( subid ) )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot set %s for enabled subscription " ,
" failover " ) ) ) ;
errmsg ( " cannot disable two_phase when prepared transactions are present " ) ,
errhint ( " Resolve these transactions and try again. " ) ) ) ;
/* Change system catalog accordingly */
values [ Anum_pg_subscription_subtwophasestate - 1 ] =
CharGetDatum ( opts . twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED ) ;
replaces [ Anum_pg_subscription_subtwophasestate - 1 ] = true ;
}
if ( IsSet ( opts . specified_opts , SUBOPT_FAILOVER ) )
{
/*
* The changed failover option of the slot can ' t be rolled
* back .
* Similar to the two_phase case above , we need to update
* the failover option for both the slot and the
* subscription .
*/
PreventInTransactionBlock ( isTopLevel , " ALTER SUBSCRIPTION ... SET (failover) " ) ;
update_failover = true ;
CheckAlterSubOption ( sub , " failover " , update_failover ,
isTopLevel ) ;
values [ Anum_pg_subscription_subfailover - 1 ] =
BoolGetDatum ( opts . failover ) ;
@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
* Try to acquire the connection necessary for altering slot .
* Try to acquire the connection necessary for altering the slot , if
* needed .
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won ' t be able to rollback altered
* slot .
*/
if ( replaces [ Anum_pg_subscription_subfailover - 1 ] )
if ( update_failover | | update_two_phase )
{
bool must_use_password ;
char * err ;
@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY ( ) ;
{
walrcv_alter_slot ( wrconn , sub - > slotname , opts . failover ) ;
walrcv_alter_slot ( wrconn , sub - > slotname ,
update_failover ? & opts . failover : NULL ,
update_two_phase ? & opts . twophase : NULL ) ;
}
PG_FINALLY ( ) ;
{
@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* New workers won ' t be started because we hold an exclusive lock on the
* subscription till the end of the transaction .
*/
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
subworkers = logicalrep_workers_find ( subid , false ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
subworkers = logicalrep_workers_find ( subid , false , true ) ;
foreach ( lc , subworkers )
{
LogicalRepWorker * w = ( LogicalRepWorker * ) lfirst ( lc ) ;