@ -396,8 +396,6 @@ static void stream_close_file(void);
static void send_feedback ( XLogRecPtr recvpos , bool force , bool requestReply ) ;
static void DisableSubscriptionAndExit ( void ) ;
static void apply_handle_commit_internal ( LogicalRepCommitData * commit_data ) ;
static void apply_handle_insert_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
@ -4327,6 +4325,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
stream_stop_internal ( xid ) ;
}
/*
* Sets streaming options including replication slot name and origin start
* position . Workers need these options for logical replication .
*/
void
set_stream_options ( WalRcvStreamOptions * options ,
char * slotname ,
XLogRecPtr * origin_startpos )
{
int server_version ;
options - > logical = true ;
options - > startpoint = * origin_startpos ;
options - > slotname = slotname ;
server_version = walrcv_server_version ( LogRepWorkerWalRcvConn ) ;
options - > proto . logical . proto_version =
server_version > = 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version > = 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version > = 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM ;
options - > proto . logical . publication_names = MySubscription - > publications ;
options - > proto . logical . binary = MySubscription - > binary ;
/*
* Assign the appropriate option value for streaming option according to
* the ' streaming ' mode and the publisher ' s ability to support that mode .
*/
if ( server_version > = 160000 & &
MySubscription - > stream = = LOGICALREP_STREAM_PARALLEL )
{
options - > proto . logical . streaming_str = " parallel " ;
MyLogicalRepWorker - > parallel_apply = true ;
}
else if ( server_version > = 140000 & &
MySubscription - > stream ! = LOGICALREP_STREAM_OFF )
{
options - > proto . logical . streaming_str = " on " ;
MyLogicalRepWorker - > parallel_apply = false ;
}
else
{
options - > proto . logical . streaming_str = NULL ;
MyLogicalRepWorker - > parallel_apply = false ;
}
options - > proto . logical . twophase = false ;
options - > proto . logical . origin = pstrdup ( MySubscription - > origin ) ;
}
/*
* Cleanup the memory for subxacts and reset the related variables .
*/
@ -4361,24 +4410,18 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
}
/*
* Execute the initial sync with error handling . Disable the subscription ,
* if it ' s required .
* Common function to run the apply loop with error handling . Disable the
* subscription , if necessary .
*
* Allocate the slot name in long - lived context on return . Note that we don ' t
* handle FATAL errors which are probably because of system resource error and
* are not repeatable .
* Note that we don ' t handle FATAL errors which are probably because
* of system resource error and are not repeatable .
*/
static void
start_table_sync ( XLogRecPtr * origin_startpos , char * * myslotname )
void
start_apply ( XLogRecPtr origin_startpos )
{
char * syncslotname = NULL ;
Assert ( am_tablesync_worker ( ) ) ;
PG_TRY ( ) ;
{
/* Call initial sync. */
syncslotname = LogicalRepSyncTableStart ( origin_startpos ) ;
LogicalRepApplyLoop ( origin_startpos ) ;
}
PG_CATCH ( ) ;
{
@ -4387,65 +4430,132 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
else
{
/*
* Report the worker failed during table synchronization . Abort
* the current transaction so that the stats message is sent in an
* Report the worker failed while applying changes . Abort the
* current transaction so that the stats message is sent in an
* idle state .
*/
AbortOutOfAnyTransaction ( ) ;
pgstat_report_subscription_error ( MySubscription - > oid , false ) ;
pgstat_report_subscription_error ( MySubscription - > oid , ! am_tablesync_worker ( ) ) ;
PG_RE_THROW ( ) ;
}
}
PG_END_TRY ( ) ;
/* allocate slot name in long-lived context */
* myslotname = MemoryContextStrdup ( ApplyContext , syncslotname ) ;
pfree ( syncslotname ) ;
}
/*
* Run the apply loop with error handling . Disable the subscription ,
* if necessary .
* Runs the leader apply worker .
*
* Note that we don ' t handle FATAL errors which are probably because
* of system resource error and are not repeatable .
* It sets up replication origin , streaming options and then starts streaming .
*/
static void
start_apply ( XLogRecPtr origin_startpos )
run_apply_worker ( )
{
PG_TRY ( ) ;
char originname [ NAMEDATALEN ] ;
XLogRecPtr origin_startpos = InvalidXLogRecPtr ;
char * slotname = NULL ;
WalRcvStreamOptions options ;
RepOriginId originid ;
TimeLineID startpointTLI ;
char * err ;
bool must_use_password ;
slotname = MySubscription - > slotname ;
/*
* This shouldn ' t happen if the subscription is enabled , but guard against
* DDL bugs or manual catalog changes . ( libpqwalreceiver will crash if
* slot is NULL . )
*/
if ( ! slotname )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " subscription has no replication slot set " ) ) ) ;
/* Setup replication origin tracking. */
ReplicationOriginNameForLogicalRep ( MySubscription - > oid , InvalidOid ,
originname , sizeof ( originname ) ) ;
StartTransactionCommand ( ) ;
originid = replorigin_by_name ( originname , true ) ;
if ( ! OidIsValid ( originid ) )
originid = replorigin_create ( originname ) ;
replorigin_session_setup ( originid , 0 ) ;
replorigin_session_origin = originid ;
origin_startpos = replorigin_session_get_progress ( false ) ;
/* Is the use of a password mandatory? */
must_use_password = MySubscription - > passwordrequired & &
! superuser_arg ( MySubscription - > owner ) ;
/* Note that the superuser_arg call can access the DB */
CommitTransactionCommand ( ) ;
LogRepWorkerWalRcvConn = walrcv_connect ( MySubscription - > conninfo , true ,
must_use_password ,
MySubscription - > name , & err ) ;
if ( LogRepWorkerWalRcvConn = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
/*
* We don ' t really use the output identify_system for anything but it does
* some initializations on the upstream so let ' s still call it .
*/
( void ) walrcv_identify_system ( LogRepWorkerWalRcvConn , & startpointTLI ) ;
set_apply_error_context_origin ( originname ) ;
set_stream_options ( & options , slotname , & origin_startpos ) ;
/*
* Even when the two_phase mode is requested by the user , it remains as
* the tri - state PENDING until all tablesyncs have reached READY state .
* Only then , can it become ENABLED .
*
* Note : If the subscription has no tables then leave the state as
* PENDING , which allows ALTER SUBSCRIPTION . . . REFRESH PUBLICATION to
* work .
*/
if ( MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING & &
AllTablesyncsReady ( ) )
{
LogicalRepApplyLoop ( origin_startpos ) ;
/* Start streaming with two_phase enabled */
options . proto . logical . twophase = true ;
walrcv_startstreaming ( LogRepWorkerWalRcvConn , & options ) ;
StartTransactionCommand ( ) ;
UpdateTwoPhaseState ( MySubscription - > oid , LOGICALREP_TWOPHASE_STATE_ENABLED ) ;
MySubscription - > twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED ;
CommitTransactionCommand ( ) ;
}
PG_CATCH ( ) ;
else
{
if ( MySubscription - > disableonerr )
DisableSubscriptionAndExit ( ) ;
else
{
/*
* Report the worker failed while applying changes . Abort the
* current transaction so that the stats message is sent in an
* idle state .
*/
AbortOutOfAnyTransaction ( ) ;
pgstat_report_subscription_error ( MySubscription - > oid , ! am_tablesync_worker ( ) ) ;
PG_RE_THROW ( ) ;
}
walrcv_startstreaming ( LogRepWorkerWalRcvConn , & options ) ;
}
PG_END_TRY ( ) ;
ereport ( DEBUG1 ,
( errmsg_internal ( " logical replication apply worker for subscription \" %s \" two_phase is %s " ,
MySubscription - > name ,
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_DISABLED ? " DISABLED " :
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING ? " PENDING " :
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_ENABLED ? " ENABLED " :
" ? " ) ) ) ;
/* Run the main loop. */
start_apply ( origin_startpos ) ;
}
/*
* Common initialization for leader apply worker and parallel apply worker .
* Common initialization for leader apply worker , parallel apply worker and
* tablesync worker .
*
* Initialize the database connection , in - memory subscription and necessary
* config options .
*/
void
InitializeApplyWorker ( void )
InitializeLogRep Worker ( void )
{
MemoryContext oldctx ;
@ -4518,22 +4628,15 @@ InitializeApplyWorker(void)
CommitTransactionCommand ( ) ;
}
/* Logical Replication Apply worker entry point */
/* Common function to setup the leader apply or tablesync worker. */
void
ApplyWorkerMain ( Datum main_arg )
SetupApplyOrSyncWorker ( int worker_slot )
{
int worker_slot = DatumGetInt32 ( main_arg ) ;
char originname [ NAMEDATALEN ] ;
XLogRecPtr origin_startpos = InvalidXLogRecPtr ;
char * myslotname = NULL ;
WalRcvStreamOptions options ;
int server_version ;
InitializingApplyWorker = true ;
/* Attach to slot */
logicalrep_worker_attach ( worker_slot ) ;
Assert ( am_tablesync_worker ( ) | | am_leader_apply_worker ( ) ) ;
/* Setup signal handling */
pqsignal ( SIGHUP , SignalHandlerForConfigReload ) ;
pqsignal ( SIGTERM , die ) ;
@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg)
/* Load the libpq-specific functions */
load_file ( " libpqwalreceiver " , false ) ;
InitializeApplyWorker ( ) ;
InitializingApplyWorker = false ;
InitializeLogRepWorker ( ) ;
/* Connect to the origin and start the replication. */
elog ( DEBUG1 , " connecting to publisher using connection string \" %s \" " ,
MySubscription - > conninfo ) ;
if ( am_tablesync_worker ( ) )
{
start_table_sync ( & origin_startpos , & myslotname ) ;
ReplicationOriginNameForLogicalRep ( MySubscription - > oid ,
MyLogicalRepWorker - > relid ,
originname ,
sizeof ( originname ) ) ;
set_apply_error_context_origin ( originname ) ;
}
else
{
/* This is the leader apply worker */
RepOriginId originid ;
TimeLineID startpointTLI ;
char * err ;
bool must_use_password ;
myslotname = MySubscription - > slotname ;
/*
* This shouldn ' t happen if the subscription is enabled , but guard
* against DDL bugs or manual catalog changes . ( libpqwalreceiver will
* crash if slot is NULL . )
*/
if ( ! myslotname )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " subscription has no replication slot set " ) ) ) ;
/* Setup replication origin tracking. */
StartTransactionCommand ( ) ;
ReplicationOriginNameForLogicalRep ( MySubscription - > oid , InvalidOid ,
originname , sizeof ( originname ) ) ;
originid = replorigin_by_name ( originname , true ) ;
if ( ! OidIsValid ( originid ) )
originid = replorigin_create ( originname ) ;
replorigin_session_setup ( originid , 0 ) ;
replorigin_session_origin = originid ;
origin_startpos = replorigin_session_get_progress ( false ) ;
/* Is the use of a password mandatory? */
must_use_password = MySubscription - > passwordrequired & &
! superuser_arg ( MySubscription - > owner ) ;
/* Note that the superuser_arg call can access the DB */
CommitTransactionCommand ( ) ;
LogRepWorkerWalRcvConn = walrcv_connect ( MySubscription - > conninfo , true ,
must_use_password ,
MySubscription - > name , & err ) ;
if ( LogRepWorkerWalRcvConn = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
/*
* We don ' t really use the output identify_system for anything but it
* does some initializations on the upstream so let ' s still call it .
*/
( void ) walrcv_identify_system ( LogRepWorkerWalRcvConn , & startpointTLI ) ;
set_apply_error_context_origin ( originname ) ;
}
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state .
@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg)
CacheRegisterSyscacheCallback ( SUBSCRIPTIONRELMAP ,
invalidate_syncing_table_states ,
( Datum ) 0 ) ;
}
/* Build logical replication streaming options. */
options . logical = true ;
options . startpoint = origin_startpos ;
options . slotname = myslotname ;
server_version = walrcv_server_version ( LogRepWorkerWalRcvConn ) ;
options . proto . logical . proto_version =
server_version > = 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version > = 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version > = 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM ;
options . proto . logical . publication_names = MySubscription - > publications ;
options . proto . logical . binary = MySubscription - > binary ;
/*
* Assign the appropriate option value for streaming option according to
* the ' streaming ' mode and the publisher ' s ability to support that mode .
*/
if ( server_version > = 160000 & &
MySubscription - > stream = = LOGICALREP_STREAM_PARALLEL )
{
options . proto . logical . streaming_str = " parallel " ;
MyLogicalRepWorker - > parallel_apply = true ;
}
else if ( server_version > = 140000 & &
MySubscription - > stream ! = LOGICALREP_STREAM_OFF )
{
options . proto . logical . streaming_str = " on " ;
MyLogicalRepWorker - > parallel_apply = false ;
}
else
{
options . proto . logical . streaming_str = NULL ;
MyLogicalRepWorker - > parallel_apply = false ;
}
options . proto . logical . twophase = false ;
options . proto . logical . origin = pstrdup ( MySubscription - > origin ) ;
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain ( Datum main_arg )
{
int worker_slot = DatumGetInt32 ( main_arg ) ;
if ( ! am_tablesync_worker ( ) )
{
/*
* Even when the two_phase mode is requested by the user , it remains
* as the tri - state PENDING until all tablesyncs have reached READY
* state . Only then , can it become ENABLED .
*
* Note : If the subscription has no tables then leave the state as
* PENDING , which allows ALTER SUBSCRIPTION . . . REFRESH PUBLICATION to
* work .
*/
if ( MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING & &
AllTablesyncsReady ( ) )
{
/* Start streaming with two_phase enabled */
options . proto . logical . twophase = true ;
walrcv_startstreaming ( LogRepWorkerWalRcvConn , & options ) ;
InitializingApplyWorker = true ;
StartTransactionCommand ( ) ;
UpdateTwoPhaseState ( MySubscription - > oid , LOGICALREP_TWOPHASE_STATE_ENABLED ) ;
MySubscription - > twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED ;
CommitTransactionCommand ( ) ;
}
else
{
walrcv_startstreaming ( LogRepWorkerWalRcvConn , & options ) ;
}
SetupApplyOrSyncWorker ( worker_slot ) ;
ereport ( DEBUG1 ,
( errmsg_internal ( " logical replication apply worker for subscription \" %s \" two_phase is %s " ,
MySubscription - > name ,
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_DISABLED ? " DISABLED " :
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING ? " PENDING " :
MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_ENABLED ? " ENABLED " :
" ? " ) ) ) ;
}
else
{
/* Start normal logical streaming replication. */
walrcv_startstreaming ( LogRepWorkerWalRcvConn , & options ) ;
}
InitializingApplyWorker = false ;
/* Run the main loop. */
start_apply ( origin_startpos ) ;
run_apply_worker ( ) ;
proc_exit ( 0 ) ;
}
@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg)
* After error recovery , disable the subscription in a new transaction
* and exit cleanly .
*/
static void
void
DisableSubscriptionAndExit ( void )
{
/*