@ -288,7 +288,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate_lsn ) ;
MyLogicalRepWorker - > relstate_lsn ) ;
walrcv_endstreaming ( wrc onn, & tli ) ;
walrcv_endstreaming ( LogRepWorkerWalRcvC onn, & tli ) ;
finish_sync_worker ( ) ;
finish_sync_worker ( ) ;
}
}
else
else
@ -584,7 +584,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
for ( ; ; )
for ( ; ; )
{
{
/* Try read the data. */
/* Try read the data. */
len = walrcv_receive ( wrc onn, & buf , & fd ) ;
len = walrcv_receive ( LogRepWorkerWalRcvC onn, & buf , & fd ) ;
CHECK_FOR_INTERRUPTS ( ) ;
CHECK_FOR_INTERRUPTS ( ) ;
@ -657,7 +657,8 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND c.relname = %s " ,
" AND c.relname = %s " ,
quote_literal_cstr ( nspname ) ,
quote_literal_cstr ( nspname ) ,
quote_literal_cstr ( relname ) ) ;
quote_literal_cstr ( relname ) ) ;
res = walrcv_exec ( wrconn , cmd . data , lengthof ( tableRow ) , tableRow ) ;
res = walrcv_exec ( LogRepWorkerWalRcvConn , cmd . data ,
lengthof ( tableRow ) , tableRow ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
ereport ( ERROR ,
@ -695,9 +696,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u "
" AND a.attrelid = %u "
" ORDER BY a.attnum " ,
" ORDER BY a.attnum " ,
lrel - > remoteid ,
lrel - > remoteid ,
( walrcv_server_version ( wrconn ) > = 120000 ? " AND a.attgenerated = '' " : " " ) ,
( walrcv_server_version ( LogRepWorkerWalRcvConn ) > = 120000 ?
" AND a.attgenerated = '' " : " " ) ,
lrel - > remoteid ) ;
lrel - > remoteid ) ;
res = walrcv_exec ( wrconn , cmd . data , lengthof ( attrRow ) , attrRow ) ;
res = walrcv_exec ( LogRepWorkerWalRcvConn , cmd . data ,
lengthof ( attrRow ) , attrRow ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
ereport ( ERROR ,
@ -784,7 +787,7 @@ copy_table(Relation rel)
appendStringInfo ( & cmd , " FROM %s) TO STDOUT " ,
appendStringInfo ( & cmd , " FROM %s) TO STDOUT " ,
quote_qualified_identifier ( lrel . nspname , lrel . relname ) ) ;
quote_qualified_identifier ( lrel . nspname , lrel . relname ) ) ;
}
}
res = walrcv_exec ( wrc onn, cmd . data , 0 , NULL ) ;
res = walrcv_exec ( LogRepWorkerWalRcvC onn, cmd . data , 0 , NULL ) ;
pfree ( cmd . data ) ;
pfree ( cmd . data ) ;
if ( res - > status ! = WALRCV_OK_COPY_OUT )
if ( res - > status ! = WALRCV_OK_COPY_OUT )
ereport ( ERROR ,
ereport ( ERROR ,
@ -851,8 +854,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* application_name , so that it is different from the main apply worker ,
* application_name , so that it is different from the main apply worker ,
* so that synchronous replication can distinguish them .
* so that synchronous replication can distinguish them .
*/
*/
wrconn = walrcv_connect ( MySubscription - > conninfo , true , slotname , & err ) ;
LogRepWorkerWalRcvConn = walrcv_connect ( MySubscription - > conninfo , true ,
if ( wrconn = = NULL )
slotname , & err ) ;
if ( LogRepWorkerWalRcvConn = = NULL )
ereport ( ERROR ,
ereport ( ERROR ,
( errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
( errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
@ -897,7 +901,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* inside the transaction so that we can use the snapshot made
* inside the transaction so that we can use the snapshot made
* by the slot to get existing data .
* by the slot to get existing data .
*/
*/
res = walrcv_exec ( wrc onn,
res = walrcv_exec ( LogRepWorkerWalRcvC onn,
" BEGIN READ ONLY ISOLATION LEVEL "
" BEGIN READ ONLY ISOLATION LEVEL "
" REPEATABLE READ " , 0 , NULL ) ;
" REPEATABLE READ " , 0 , NULL ) ;
if ( res - > status ! = WALRCV_OK_COMMAND )
if ( res - > status ! = WALRCV_OK_COMMAND )
@ -914,14 +918,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* that is consistent with the lsn used by the slot to start
* that is consistent with the lsn used by the slot to start
* decoding .
* decoding .
*/
*/
walrcv_create_slot ( wrc onn, slotname , true ,
walrcv_create_slot ( LogRepWorkerWalRcvC onn, slotname , true ,
CRS_USE_SNAPSHOT , origin_startpos ) ;
CRS_USE_SNAPSHOT , origin_startpos ) ;
PushActiveSnapshot ( GetTransactionSnapshot ( ) ) ;
PushActiveSnapshot ( GetTransactionSnapshot ( ) ) ;
copy_table ( rel ) ;
copy_table ( rel ) ;
PopActiveSnapshot ( ) ;
PopActiveSnapshot ( ) ;
res = walrcv_exec ( wrc onn, " COMMIT " , 0 , NULL ) ;
res = walrcv_exec ( LogRepWorkerWalRcvC onn, " COMMIT " , 0 , NULL ) ;
if ( res - > status ! = WALRCV_OK_COMMAND )
if ( res - > status ! = WALRCV_OK_COMMAND )
ereport ( ERROR ,
ereport ( ERROR ,
( errmsg ( " table copy could not finish transaction on publisher " ) ,
( errmsg ( " table copy could not finish transaction on publisher " ) ,