@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
}
}
/*
* Add publication names from the list to a string .
*/
static void
get_publications_str ( List * publications , StringInfo dest , bool quote_literal )
{
ListCell * lc ;
bool first = true ;
Assert ( list_length ( publications ) > 0 ) ;
foreach ( lc , publications )
{
char * pubname = strVal ( lfirst ( lc ) ) ;
if ( first )
first = false ;
else
appendStringInfoString ( dest , " , " ) ;
if ( quote_literal )
appendStringInfoString ( dest , quote_literal_cstr ( pubname ) ) ;
else
{
appendStringInfoChar ( dest , ' " ' ) ;
appendStringInfoString ( dest , pubname ) ;
appendStringInfoChar ( dest , ' " ' ) ;
}
}
}
/*
* Check the specified publication ( s ) is ( are ) present in the publisher .
*/
static void
check_publications ( WalReceiverConn * wrconn , List * publications )
{
WalRcvExecResult * res ;
StringInfo cmd ;
TupleTableSlot * slot ;
List * publicationsCopy = NIL ;
Oid tableRow [ 1 ] = { TEXTOID } ;
cmd = makeStringInfo ( ) ;
appendStringInfoString ( cmd , " SELECT t.pubname FROM \n "
" pg_catalog.pg_publication t WHERE \n "
" t.pubname IN ( " ) ;
get_publications_str ( publications , cmd , true ) ;
appendStringInfoChar ( cmd , ' ) ' ) ;
res = walrcv_exec ( wrconn , cmd - > data , 1 , tableRow ) ;
pfree ( cmd - > data ) ;
pfree ( cmd ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
errmsg_plural ( " could not receive publication from the publisher: %s " ,
" could not receive list of publications from the publisher: %s " ,
list_length ( publications ) ,
res - > err ) ) ;
publicationsCopy = list_copy ( publications ) ;
/* Process publication(s). */
slot = MakeSingleTupleTableSlot ( res - > tupledesc , & TTSOpsMinimalTuple ) ;
while ( tuplestore_gettupleslot ( res - > tuplestore , true , false , slot ) )
{
char * pubname ;
bool isnull ;
pubname = TextDatumGetCString ( slot_getattr ( slot , 1 , & isnull ) ) ;
Assert ( ! isnull ) ;
/* Delete the publication present in publisher from the list. */
publicationsCopy = list_delete ( publicationsCopy , makeString ( pubname ) ) ;
ExecClearTuple ( slot ) ;
}
ExecDropSingleTupleTableSlot ( slot ) ;
walrcv_clear_result ( res ) ;
if ( list_length ( publicationsCopy ) )
{
/* Prepare the list of non-existent publication(s) for error message. */
StringInfo pubnames = makeStringInfo ( ) ;
get_publications_str ( publicationsCopy , pubnames , false ) ;
ereport ( WARNING ,
errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg_plural ( " publication %s does not exist in the publisher " ,
" publications %s do not exist in the publisher " ,
list_length ( publicationsCopy ) ,
pubnames - > data ) ) ;
}
}
/*
* Auxiliary function to build a text array out of a list of String nodes .
*/
@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY ( ) ;
{
check_publications ( wrconn , publications ) ;
/*
* Set sync state based on if we were asked to do data copy or
* not .
@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
}
static void
AlterSubscription_refresh ( Subscription * sub , bool copy_data )
AlterSubscription_refresh ( Subscription * sub , bool copy_data ,
List * validate_publications )
{
char * err ;
List * pubrel_names ;
@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
PG_TRY ( ) ;
{
if ( validate_publications )
check_publications ( wrconn , validate_publications ) ;
/* Get the list of relations from publisher. */
pubrel_names = fetch_table_list ( wrconn , sub - > publications ) ;
pubrel_names = list_concat ( pubrel_names ,
@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Make sure refresh sees the new list of publications. */
sub - > publications = stmt - > publication ;
AlterSubscription_refresh ( sub , opts . copy_data ) ;
AlterSubscription_refresh ( sub , opts . copy_data ,
stmt - > publication ) ;
}
break ;
@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Refresh if user asked us to. */
if ( opts . refresh )
{
/* We only need to validate user specified publications. */
List * validate_publications = ( isadd ) ? stmt - > publication : NULL ;
if ( ! sub - > enabled )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Refresh the new list of publications. */
sub - > publications = publist ;
AlterSubscription_refresh ( sub , opts . copy_data ) ;
AlterSubscription_refresh ( sub , opts . copy_data ,
validate_publications ) ;
}
break ;
@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PreventInTransactionBlock ( isTopLevel , " ALTER SUBSCRIPTION ... REFRESH " ) ;
AlterSubscription_refresh ( sub , opts . copy_data ) ;
AlterSubscription_refresh ( sub , opts . copy_data , NULL ) ;
break ;
}
@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
StringInfoData cmd ;
TupleTableSlot * slot ;
Oid tableRow [ 2 ] = { TEXTOID , TEXTOID } ;
ListCell * lc ;
bool first ;
List * tablelist = NIL ;
Assert ( list_length ( publications ) > 0 ) ;
initStringInfo ( & cmd ) ;
appendStringInfoString ( & cmd , " SELECT DISTINCT t.schemaname, t.tablename \n "
" FROM pg_catalog.pg_publication_tables t \n "
" WHERE t.pubname IN ( " ) ;
first = true ;
foreach ( lc , publications )
{
char * pubname = strVal ( lfirst ( lc ) ) ;
if ( first )
first = false ;
else
appendStringInfoString ( & cmd , " , " ) ;
appendStringInfoString ( & cmd , quote_literal_cstr ( pubname ) ) ;
}
get_publications_str ( publications , & cmd , true ) ;
appendStringInfoChar ( & cmd , ' ) ' ) ;
res = walrcv_exec ( wrconn , cmd . data , 2 , tableRow ) ;