@ -92,6 +92,10 @@ typedef struct SubOpts
} SubOpts ;
static List * fetch_table_list ( WalReceiverConn * wrconn , List * publications ) ;
static void check_publications_origin ( WalReceiverConn * wrconn ,
List * publications , bool copydata ,
char * origin , Oid * subrel_local_oids ,
int subrel_count , char * subname ) ;
static void check_duplicates_in_publist ( List * publist , Datum * datums ) ;
static List * merge_publications ( List * oldpublist , List * newpublist , bool addpub , const char * subname ) ;
static void ReportSlotConnectionError ( List * rstates , Oid subid , char * slotname , char * err ) ;
@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY ( ) ;
{
check_publications ( wrconn , publications ) ;
check_publications_origin ( wrconn , publications , opts . copy_data ,
opts . origin , NULL , 0 , stmt - > subname ) ;
/*
* Set sync state based on if we were asked to do data copy or
@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
ListCell * lc ;
int off ;
int remove_rel_len ;
int subrel_count ;
Relation rel = NULL ;
typedef struct SubRemoveRels
{
@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Get local table list. */
subrel_states = GetSubscriptionRelations ( sub - > oid , false ) ;
subrel_count = list_length ( subrel_states ) ;
/*
* Build qsorted array of local table oids for faster lookup . This can
* potentially contain all tables in the database so speed of lookup
* is important .
*/
subrel_local_oids = palloc ( list_length ( subrel_states ) * sizeof ( Oid ) ) ;
subrel_local_oids = palloc ( subrel_count * sizeof ( Oid ) ) ;
off = 0 ;
foreach ( lc , subrel_states )
{
@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
subrel_local_oids [ off + + ] = relstate - > relid ;
}
qsort ( subrel_local_oids , list_length ( subrel_states ) ,
qsort ( subrel_local_oids , subrel_count ,
sizeof ( Oid ) , oid_cmp ) ;
check_publications_origin ( wrconn , sub - > publications , copy_data ,
sub - > origin , subrel_local_oids ,
subrel_count , sub - > name ) ;
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them .
*/
sub_remove_rels = palloc ( list_length ( subrel_states ) * sizeof ( SubRemoveRels ) ) ;
sub_remove_rels = palloc ( subrel_count * sizeof ( SubRemoveRels ) ) ;
/*
* Walk over the remote tables and try to match them to locally known
@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
pubrel_local_oids [ off + + ] = relid ;
if ( ! bsearch ( & relid , subrel_local_oids ,
list_length ( subrel_states ) , sizeof ( Oid ) , oid_cmp ) )
subrel_count , sizeof ( Oid ) , oid_cmp ) )
{
AddSubscriptionRelState ( sub - > oid , relid ,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
sizeof ( Oid ) , oid_cmp ) ;
remove_rel_len = 0 ;
for ( off = 0 ; off < list_length ( subrel_states ) ; off + + )
for ( off = 0 ; off < subrel_count ; off + + )
{
Oid relid = subrel_local_oids [ off ] ;
@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
table_close ( rel , RowExclusiveLock ) ;
}
/*
* Check and log a warning if the publisher has subscribed to the same table
* from some other publisher . This check is required only if " copy_data = true "
* and " origin = none " for CREATE SUBSCRIPTION and
* ALTER SUBSCRIPTION . . . REFRESH statements to notify the user that data
* having origin might have been copied .
*
* This check need not be performed on the tables that are already added
* because incremental sync for those tables will happen through WAL and the
* origin of the data can be identified from the WAL records .
*
* subrel_local_oids contains the list of relation oids that are already
* present on the subscriber .
*/
static void
check_publications_origin ( WalReceiverConn * wrconn , List * publications ,
bool copydata , char * origin , Oid * subrel_local_oids ,
int subrel_count , char * subname )
{
WalRcvExecResult * res ;
StringInfoData cmd ;
TupleTableSlot * slot ;
Oid tableRow [ 1 ] = { TEXTOID } ;
List * publist = NIL ;
int i ;
if ( ! copydata | | ! origin | |
( pg_strcasecmp ( origin , LOGICALREP_ORIGIN_NONE ) ! = 0 ) )
return ;
initStringInfo ( & cmd ) ;
appendStringInfoString ( & cmd ,
" SELECT DISTINCT P.pubname AS pubname \n "
" FROM pg_publication P, \n "
" LATERAL pg_get_publication_tables(P.pubname) GPT \n "
" JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid), \n "
" pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) \n "
" WHERE C.oid = GPT.relid AND P.pubname IN ( " ) ;
get_publications_str ( publications , & cmd , true ) ;
appendStringInfoString ( & cmd , " ) \n " ) ;
/*
* In case of ALTER SUBSCRIPTION . . . REFRESH , subrel_local_oids contains
* the list of relation oids that are already present on the subscriber .
* This check should be skipped for these tables .
*/
for ( i = 0 ; i < subrel_count ; i + + )
{
Oid relid = subrel_local_oids [ i ] ;
char * schemaname = get_namespace_name ( get_rel_namespace ( relid ) ) ;
char * tablename = get_rel_name ( relid ) ;
appendStringInfo ( & cmd , " AND NOT (N.nspname = '%s' AND C.relname = '%s') \n " ,
schemaname , tablename ) ;
}
res = walrcv_exec ( wrconn , cmd . data , 1 , tableRow ) ;
pfree ( cmd . data ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not receive list of replicated tables from the publisher: %s " ,
res - > err ) ) ) ;
/* Process tables. */
slot = MakeSingleTupleTableSlot ( res - > tupledesc , & TTSOpsMinimalTuple ) ;
while ( tuplestore_gettupleslot ( res - > tuplestore , true , false , slot ) )
{
char * pubname ;
bool isnull ;
pubname = TextDatumGetCString ( slot_getattr ( slot , 1 , & isnull ) ) ;
Assert ( ! isnull ) ;
ExecClearTuple ( slot ) ;
publist = list_append_unique ( publist , makeString ( pubname ) ) ;
}
/*
* Log a warning if the publisher has subscribed to the same table from
* some other publisher . We cannot know the origin of data during the
* initial sync . Data origins can be found only from the WAL by looking at
* the origin id .
*
* XXX : For simplicity , we don ' t check whether the table has any data or
* not . If the table doesn ' t have any data then we don ' t need to
* distinguish between data having origin and data not having origin so we
* can avoid logging a warning in that case .
*/
if ( publist )
{
StringInfo pubnames = makeStringInfo ( ) ;
/* Prepare the list of publication(s) for warning message. */
get_publications_str ( publist , pubnames , false ) ;
ereport ( WARNING ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " subscription \" %s \" requested copy_data with origin = NONE but might copy data that had a different origin " ,
subname ) ,
errdetail_plural ( " Subscribed publication %s is subscribing to other publications. " ,
" Subscribed publications %s are subscribing to other publications. " ,
list_length ( publist ) , pubnames - > data ) ,
errhint ( " Verify that initial data copied from the publisher tables did not come from other origins. " ) ) ;
}
ExecDropSingleTupleTableSlot ( slot ) ;
walrcv_clear_result ( res ) ;
}
/*
* Get the list of tables which belong to specified publications on the
* publisher connection .