@ -34,6 +34,7 @@
# include "nodes/makefuncs.h"
# include "nodes/makefuncs.h"
# include "replication/logicallauncher.h"
# include "replication/logicallauncher.h"
# include "replication/origin.h"
# include "replication/origin.h"
# include "replication/slot.h"
# include "replication/walreceiver.h"
# include "replication/walreceiver.h"
# include "replication/walsender.h"
# include "replication/walsender.h"
# include "replication/worker_internal.h"
# include "replication/worker_internal.h"
@ -46,6 +47,8 @@
# include "utils/syscache.h"
# include "utils/syscache.h"
static List * fetch_table_list ( WalReceiverConn * wrconn , List * publications ) ;
static List * fetch_table_list ( WalReceiverConn * wrconn , List * publications ) ;
static void ReportSlotConnectionError ( List * rstates , Oid subid , char * slotname , char * err ) ;
/*
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands .
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands .
@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
Oid * pubrel_local_oids ;
Oid * pubrel_local_oids ;
ListCell * lc ;
ListCell * lc ;
int off ;
int off ;
int remove_rel_len ;
Relation rel = NULL ;
typedef struct SubRemoveRels
{
Oid relid ;
char state ;
} SubRemoveRels ;
SubRemoveRels * sub_remove_rels ;
/* Load the library providing us libpq calls. */
/* Load the library providing us libpq calls. */
load_file ( " libpqwalreceiver " , false ) ;
load_file ( " libpqwalreceiver " , false ) ;
/* Try to connect to the publisher. */
PG_TRY ( ) ;
wrconn = walrcv_connect ( sub - > conninfo , true , sub - > name , & err ) ;
{
if ( ! wrconn )
/* Try to connect to the publisher. */
ereport ( ERROR ,
wrconn = walrcv_connect ( sub - > conninfo , true , sub - > name , & err ) ;
( errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
if ( ! wrconn )
ereport ( ERROR ,
/* Get the table list from publisher. */
( errmsg ( " could not connect to the publisher: %s " , err ) ) ) ;
pubrel_names = fetch_table_list ( wrconn , sub - > publications ) ;
/* We are done with the remote side, close connection . */
/* Get the table list from publisher . */
walrcv_disconnec t( wrconn ) ;
pubrel_names = fetch_table_lis t ( wrconn , sub - > publications ) ;
/* Get local table list. */
/* Get local table list. */
subrel_states = GetSubscriptionRelations ( sub - > oid ) ;
subrel_states = GetSubscriptionRelations ( sub - > oid ) ;
/*
/*
* Build qsorted array of local table oids for faster lookup . This can
* Build qsorted array of local table oids for faster lookup . This can
* potentially contain all tables in the database so speed of lookup is
* potentially contain all tables in the database so speed of lookup
* important .
* is important .
*/
*/
subrel_local_oids = palloc ( list_length ( subrel_states ) * sizeof ( Oid ) ) ;
subrel_local_oids = palloc ( list_length ( subrel_states ) * sizeof ( Oid ) ) ;
off = 0 ;
off = 0 ;
foreach ( lc , subrel_states )
foreach ( lc , subrel_states )
{
{
SubscriptionRelState * relstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
SubscriptionRelState * relstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
subrel_local_oids [ off + + ] = relstate - > relid ;
subrel_local_oids [ off + + ] = relstate - > relid ;
}
}
qsort ( subrel_local_oids , list_length ( subrel_states ) ,
qsort ( subrel_local_oids , list_length ( subrel_states ) ,
sizeof ( Oid ) , oid_cmp ) ;
sizeof ( Oid ) , oid_cmp ) ;
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them .
*/
sub_remove_rels = palloc ( list_length ( subrel_states ) * sizeof ( SubRemoveRels ) ) ;
/*
* Walk over the remote tables and try to match them to locally known
* tables . If the table is not known locally create a new state for
* it .
*
* Also builds array of local oids of remote tables for the next step .
*/
off = 0 ;
pubrel_local_oids = palloc ( list_length ( pubrel_names ) * sizeof ( Oid ) ) ;
foreach ( lc , pubrel_names )
{
RangeVar * rv = ( RangeVar * ) lfirst ( lc ) ;
Oid relid ;
/*
relid = RangeVarGetRelid ( rv , AccessShareLock , false ) ;
* Walk over the remote tables and try to match them to locally known
* tables . If the table is not known locally create a new state for it .
*
* Also builds array of local oids of remote tables for the next step .
*/
off = 0 ;
pubrel_local_oids = palloc ( list_length ( pubrel_names ) * sizeof ( Oid ) ) ;
foreach ( lc , pubrel_names )
/* Check for supported relkind. */
{
CheckSubscriptionRelkind ( get_rel_relkind ( relid ) ,
RangeVar * rv = ( RangeVar * ) lfirst ( lc ) ;
rv - > schemaname , rv - > relname ) ;
Oid relid ;
relid = RangeVarGetRelid ( rv , AccessShareLock , false ) ;
pubrel_local_oids [ off + + ] = relid ;
/* Check for supported relkind. */
if ( ! bsearch ( & relid , subrel_local_oids ,
CheckSubscriptionRelkind ( get_rel_relkind ( relid ) ,
list_length ( subrel_states ) , sizeof ( Oid ) , oid_cmp ) )
rv - > schemaname , rv - > relname ) ;
{
AddSubscriptionRelState ( sub - > oid , relid ,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
InvalidXLogRecPtr ) ;
ereport ( DEBUG1 ,
( errmsg ( " table \" %s.%s \" added to subscription \" %s \" " ,
rv - > schemaname , rv - > relname , sub - > name ) ) ) ;
}
}
pubrel_local_oids [ off + + ] = relid ;
/*
* Next remove state for tables we should not care about anymore using
* the data we collected above
*/
qsort ( pubrel_local_oids , list_length ( pubrel_names ) ,
sizeof ( Oid ) , oid_cmp ) ;
if ( ! bsearch ( & relid , subrel_local_oids ,
remove_rel_len = 0 ;
list_length ( subrel_states ) , sizeof ( Oid ) , oid_cmp ) )
for ( off = 0 ; off < list_length ( subrel_states ) ; off + + )
{
{
AddSubscriptionRelState ( sub - > oid , relid ,
Oid relid = subrel_local_oids [ off ] ;
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
InvalidXLogRecPtr ) ;
ereport ( DEBUG1 ,
( errmsg ( " table \" %s.%s \" added to subscription \" %s \" " ,
rv - > schemaname , rv - > relname , sub - > name ) ) ) ;
}
}
/*
if ( ! bsearch ( & relid , pubrel_local_oids ,
* Next remove state for tables we should not care about anymore using the
list_length ( pubrel_names ) , sizeof ( Oid ) , oid_cmp ) )
* data we collected above
{
*/
char state ;
qsort ( pubrel_local_oids , list_length ( pubrel_names ) ,
XLogRecPtr statelsn ;
sizeof ( Oid ) , oid_cmp ) ;
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
* prevent any race conditions with the apply worker
* re - launching workers at the same time this code is trying
* to remove those tables .
*
* Even if new worker for this particular rel is restarted it
* won ' t be able to make any progress as we hold exclusive
* lock on subscription_rel till the transaction end . It will
* simply exit as there is no corresponding rel entry .
*
* This locking also ensures that the state of rels won ' t
* change till we are done with this refresh operation .
*/
if ( ! rel )
rel = table_open ( SubscriptionRelRelationId , AccessExclusiveLock ) ;
/* Last known rel state. */
state = GetSubscriptionRelState ( sub - > oid , relid , & statelsn ) ;
sub_remove_rels [ remove_rel_len ] . relid = relid ;
sub_remove_rels [ remove_rel_len + + ] . state = state ;
RemoveSubscriptionRel ( sub - > oid , relid ) ;
logicalrep_worker_stop ( sub - > oid , relid ) ;
/*
* For READY state , we would have already dropped the
* tablesync origin .
*/
if ( state ! = SUBREL_STATE_READY )
{
char originname [ NAMEDATALEN ] ;
/*
* Drop the tablesync ' s origin tracking if exists .
*
* It is possible that the origin is not yet created for
* tablesync worker , this can happen for the states before
* SUBREL_STATE_FINISHEDCOPY . The apply worker can also
* concurrently try to drop the origin and by this time
* the origin might be already removed . For these reasons ,
* passing missing_ok = true .
*/
ReplicationOriginNameForTablesync ( sub - > oid , relid , originname ) ;
replorigin_drop_by_name ( originname , true , false ) ;
}
for ( off = 0 ; off < list_length ( subrel_states ) ; off + + )
ereport ( DEBUG1 ,
{
( errmsg ( " table \" %s.%s \" removed from subscription \" %s \" " ,
Oid relid = subrel_local_oids [ off ] ;
get_namespace_name ( get_rel_namespace ( relid ) ) ,
get_rel_name ( relid ) ,
sub - > name ) ) ) ;
}
}
if ( ! bsearch ( & relid , pubrel_local_oids ,
/*
list_length ( pubrel_names ) , sizeof ( Oid ) , oid_cmp ) )
* Drop the tablesync slots associated with removed tables . This has
* to be at the end because otherwise if there is an error while doing
* the database operations we won ' t be able to rollback dropped slots .
*/
for ( off = 0 ; off < remove_rel_len ; off + + )
{
{
RemoveSubscriptionRel ( sub - > oid , relid ) ;
if ( sub_remove_rels [ off ] . state ! = SUBREL_STATE_READY & &
sub_remove_rels [ off ] . state ! = SUBREL_STATE_SYNCDONE )
logicalrep_worker_stop_at_commit ( sub - > oid , relid ) ;
{
char syncslotname [ NAMEDATALEN ] = { 0 } ;
ereport ( DEBUG1 ,
( errmsg ( " table \" %s.%s \" removed from subscription \" %s \" " ,
/*
get_namespace_name ( get_rel_namespace ( relid ) ) ,
* For READY / SYNCDONE states we know the tablesync slot has
get_rel_name ( relid ) ,
* already been dropped by the tablesync worker .
sub - > name ) ) ) ;
*
* For other states , there is no certainty , maybe the slot
* does not exist yet . Also , if we fail after removing some of
* the slots , next time , it will again try to drop already
* dropped slots and fail . For these reasons , we allow
* missing_ok = true for the drop .
*/
ReplicationSlotNameForTablesync ( sub - > oid , sub_remove_rels [ off ] . relid , syncslotname ) ;
ReplicationSlotDropAtPubNode ( wrconn , syncslotname , true ) ;
}
}
}
}
}
PG_FINALLY ( ) ;
{
if ( wrconn )
walrcv_disconnect ( wrconn ) ;
}
PG_END_TRY ( ) ;
if ( rel )
table_close ( rel , NoLock ) ;
}
}
/*
/*
* Alter the existing subscription .
* Alter the existing subscription .
*/
*/
ObjectAddress
ObjectAddress
AlterSubscription ( AlterSubscriptionStmt * stmt )
AlterSubscription ( AlterSubscriptionStmt * stmt , bool isTopLevel )
{
{
Relation rel ;
Relation rel ;
ObjectAddress myself ;
ObjectAddress myself ;
@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
errmsg ( " ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions " ) ,
errmsg ( " ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions " ) ,
errhint ( " Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false). " ) ) ) ;
errhint ( " Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false). " ) ) ) ;
PreventInTransactionBlock ( isTopLevel , " ALTER SUBSCRIPTION with refresh " ) ;
/* Make sure refresh sees the new list of publications. */
/* Make sure refresh sees the new list of publications. */
sub - > publications = stmt - > publication ;
sub - > publications = stmt - > publication ;
@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL , NULL , /* no "binary" */
NULL , NULL , /* no "binary" */
NULL , NULL ) ; /* no "streaming" */
NULL , NULL ) ; /* no "streaming" */
PreventInTransactionBlock ( isTopLevel , " ALTER SUBSCRIPTION ... REFRESH " ) ;
AlterSubscription_refresh ( sub , copy_data ) ;
AlterSubscription_refresh ( sub , copy_data ) ;
break ;
break ;
@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char originname [ NAMEDATALEN ] ;
char originname [ NAMEDATALEN ] ;
char * err = NULL ;
char * err = NULL ;
WalReceiverConn * wrconn = NULL ;
WalReceiverConn * wrconn = NULL ;
StringInfoData cmd ;
Form_pg_subscription form ;
Form_pg_subscription form ;
List * rstates ;
/*
/*
* Lock pg_subscription with AccessExclusiveLock to ensure that the
* Lock pg_subscription with AccessExclusiveLock to ensure that the
@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
}
list_free ( subworkers ) ;
list_free ( subworkers ) ;
/*
* Cleanup of tablesync replication origins .
*
* Any READY - state relations would already have dealt with clean - ups .
*
* Note that the state can ' t change because we have already stopped both
* the apply and tablesync workers and they can ' t restart because of
* exclusive lock on the subscription .
*/
rstates = GetSubscriptionNotReadyRelations ( subid ) ;
foreach ( lc , rstates )
{
SubscriptionRelState * rstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
Oid relid = rstate - > relid ;
/* Only cleanup resources of tablesync workers */
if ( ! OidIsValid ( relid ) )
continue ;
/*
* Drop the tablesync ' s origin tracking if exists .
*
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true . This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY .
*/
ReplicationOriginNameForTablesync ( subid , relid , originname ) ;
replorigin_drop_by_name ( originname , true , false ) ;
}
/* Clean up dependencies */
/* Clean up dependencies */
deleteSharedDependencyRecordsFor ( SubscriptionRelationId , subid , 0 ) ;
deleteSharedDependencyRecordsFor ( SubscriptionRelationId , subid , 0 ) ;
@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* If there is no slot associated with the subscription , we can finish
* If there is no slot associated with the subscription , we can finish
* here .
* here .
*/
*/
if ( ! slotname )
if ( ! slotname & & rstates = = NIL )
{
{
table_close ( rel , NoLock ) ;
table_close ( rel , NoLock ) ;
return ;
return ;
}
}
/*
/*
* Otherwise drop the replication slot at the publisher node using the
* Try to acquire the connection necessary for dropping slots .
* replication connection .
*
* Note : If the slotname is NONE / NULL then we allow the command to finish
* and users need to manually cleanup the apply and tablesync worker slots
* later .
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won ' t be able to rollback dropped
* slot .
*/
*/
load_file ( " libpqwalreceiver " , false ) ;
load_file ( " libpqwalreceiver " , false ) ;
initStringInfo ( & cmd ) ;
appendStringInfo ( & cmd , " DROP_REPLICATION_SLOT %s WAIT " , quote_identifier ( slotname ) ) ;
wrconn = walrcv_connect ( conninfo , true , subname , & err ) ;
wrconn = walrcv_connect ( conninfo , true , subname , & err ) ;
if ( wrconn = = NULL )
if ( wrconn = = NULL )
ereport ( ERROR ,
{
( errmsg ( " could not connect to publisher when attempting to "
if ( ! slotname )
" drop the replication slot \" %s \" " , slotname ) ,
{
errdetail ( " The error was: %s " , err ) ,
/* be tidy */
/* translator: %s is an SQL ALTER command */
list_free ( rstates ) ;
errhint ( " Use %s to disassociate the subscription from the slot. " ,
table_close ( rel , NoLock ) ;
" ALTER SUBSCRIPTION ... SET (slot_name = NONE) " ) ) ) ;
return ;
}
else
{
ReportSlotConnectionError ( rstates , subid , slotname , err ) ;
}
}
PG_TRY ( ) ;
{
foreach ( lc , rstates )
{
SubscriptionRelState * rstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
Oid relid = rstate - > relid ;
/* Only cleanup resources of tablesync workers */
if ( ! OidIsValid ( relid ) )
continue ;
/*
* Drop the tablesync slots associated with removed tables .
*
* For SYNCDONE / READY states , the tablesync slot is known to have
* already been dropped by the tablesync worker .
*
* For other states , there is no certainty , maybe the slot does
* not exist yet . Also , if we fail after removing some of the
* slots , next time , it will again try to drop already dropped
* slots and fail . For these reasons , we allow missing_ok = true
* for the drop .
*/
if ( rstate - > state ! = SUBREL_STATE_SYNCDONE )
{
char syncslotname [ NAMEDATALEN ] = { 0 } ;
ReplicationSlotNameForTablesync ( subid , relid , syncslotname ) ;
ReplicationSlotDropAtPubNode ( wrconn , syncslotname , true ) ;
}
}
list_free ( rstates ) ;
/*
* If there is a slot associated with the subscription , then drop the
* replication slot at the publisher .
*/
if ( slotname )
ReplicationSlotDropAtPubNode ( wrconn , slotname , false ) ;
}
PG_FINALLY ( ) ;
{
walrcv_disconnect ( wrconn ) ;
}
PG_END_TRY ( ) ;
table_close ( rel , NoLock ) ;
}
/*
* Drop the replication slot at the publisher node using the replication
* connection .
*
* missing_ok - if true then only issue a LOG message if the slot doesn ' t
* exist .
*/
void
ReplicationSlotDropAtPubNode ( WalReceiverConn * wrconn , char * slotname , bool missing_ok )
{
StringInfoData cmd ;
Assert ( wrconn ) ;
load_file ( " libpqwalreceiver " , false ) ;
initStringInfo ( & cmd ) ;
appendStringInfo ( & cmd , " DROP_REPLICATION_SLOT %s WAIT " , quote_identifier ( slotname ) ) ;
PG_TRY ( ) ;
PG_TRY ( ) ;
{
{
@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
res = walrcv_exec ( wrconn , cmd . data , 0 , NULL ) ;
res = walrcv_exec ( wrconn , cmd . data , 0 , NULL ) ;
if ( res - > status ! = WALRCV_OK_COMMAND )
if ( res - > status = = WALRCV_OK_COMMAND )
ereport ( ERROR ,
{
/* NOTICE. Success. */
ereport ( NOTICE ,
( errmsg ( " dropped replication slot \" %s \" on publisher " ,
slotname ) ) ) ;
}
else if ( res - > status = = WALRCV_ERROR & &
missing_ok & &
res - > sqlstate = = ERRCODE_UNDEFINED_OBJECT )
{
/* LOG. Error, but missing_ok = true. */
ereport ( LOG ,
( errmsg ( " could not drop the replication slot \" %s \" on publisher " ,
( errmsg ( " could not drop the replication slot \" %s \" on publisher " ,
slotname ) ,
slotname ) ,
errdetail ( " The error was: %s " , res - > err ) ) ) ;
errdetail ( " The error was: %s " , res - > err ) ) ) ;
}
else
else
ereport ( NOTICE ,
{
( errmsg ( " dropped replication slot \" %s \" on publisher " ,
/* ERROR. */
slotname ) ) ) ;
ereport ( ERROR ,
( errmsg ( " could not drop the replication slot \" %s \" on publisher " ,
slotname ) ,
errdetail ( " The error was: %s " , res - > err ) ) ) ;
}
walrcv_clear_result ( res ) ;
walrcv_clear_result ( res ) ;
}
}
PG_FINALLY ( ) ;
PG_FINALLY ( ) ;
{
{
walrcv_disconnect ( wrconn ) ;
pfree ( cmd . data ) ;
}
}
PG_END_TRY ( ) ;
PG_END_TRY ( ) ;
pfree ( cmd . data ) ;
table_close ( rel , NoLock ) ;
}
}
/*
/*
@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist ;
return tablelist ;
}
}
/*
* This is to report the connection failure while dropping replication slots .
* Here , we report the WARNING for all tablesync slots so that user can drop
* them manually , if required .
*/
static void
ReportSlotConnectionError ( List * rstates , Oid subid , char * slotname , char * err )
{
ListCell * lc ;
foreach ( lc , rstates )
{
SubscriptionRelState * rstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
Oid relid = rstate - > relid ;
/* Only cleanup resources of tablesync workers */
if ( ! OidIsValid ( relid ) )
continue ;
/*
* Caller needs to ensure that relstate doesn ' t change underneath us .
* See DropSubscription where we get the relstates .
*/
if ( rstate - > state ! = SUBREL_STATE_SYNCDONE )
{
char syncslotname [ NAMEDATALEN ] = { 0 } ;
ReplicationSlotNameForTablesync ( subid , relid , syncslotname ) ;
elog ( WARNING , " could not drop tablesync replication slot \" %s \" " ,
syncslotname ) ;
}
}
ereport ( ERROR ,
( errmsg ( " could not connect to publisher when attempting to "
" drop the replication slot \" %s \" " , slotname ) ,
errdetail ( " The error was: %s " , err ) ,
/* translator: %s is an SQL ALTER command */
errhint ( " Use %s to disassociate the subscription from the slot. " ,
" ALTER SUBSCRIPTION ... SET (slot_name = NONE) " ) ) ) ;
}