@ -106,12 +106,29 @@ typedef struct SubOpts
XLogRecPtr lsn ;
} SubOpts ;
static List * fetch_table_list ( WalReceiverConn * wrconn , List * publications ) ;
static void check_publications_origin ( WalReceiverConn * wrconn ,
List * publications , bool copydata ,
bool retain_dead_tuples , char * origin ,
Oid * subrel_local_oids , int subrel_count ,
char * subname ) ;
/*
* PublicationRelKind represents a relation included in a publication .
* It stores the schema - qualified relation name ( rv ) and its kind ( relkind ) .
*/
typedef struct PublicationRelKind
{
RangeVar * rv ;
char relkind ;
} PublicationRelKind ;
static List * fetch_relation_list ( WalReceiverConn * wrconn , List * publications ) ;
static void check_publications_origin_tables ( WalReceiverConn * wrconn ,
List * publications , bool copydata ,
bool retain_dead_tuples ,
char * origin ,
Oid * subrel_local_oids ,
int subrel_count , char * subname ) ;
static void check_publications_origin_sequences ( WalReceiverConn * wrconn ,
List * publications ,
bool copydata , char * origin ,
Oid * subrel_local_oids ,
int subrel_count ,
char * subname ) ;
static void check_pub_dead_tuple_retention ( WalReceiverConn * wrconn ) ;
static void check_duplicates_in_publist ( List * publist , Datum * datums ) ;
static List * merge_publications ( List * oldpublist , List * newpublist , bool addpub , const char * subname ) ;
@ -736,20 +753,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner ( SubscriptionRelationId , subid , owner ) ;
/*
* A replication origin is currently created for all subscriptions ,
* including those that only contain sequences or are otherwise empty .
*
* XXX : While this is technically unnecessary , optimizing it would require
* additional logic to skip origin creation during DDL operations and
* apply workers initialization , and to handle origin creation dynamically
* when tables are added to the subscription . It is not clear whether
* preventing creation of origins is worth additional complexity .
*/
ReplicationOriginNameForLogicalRep ( subid , InvalidOid , originname , sizeof ( originname ) ) ;
replorigin_create ( originname ) ;
/*
* Connect to remote side to execute requested commands and fetch table
* info .
* and sequence info .
*/
if ( opts . connect )
{
char * err ;
WalReceiverConn * wrconn ;
List * tables ;
ListCell * lc ;
char table_state ;
bool must_use_password ;
/* Try to connect to the publisher. */
@ -764,10 +788,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
PG_TRY ( ) ;
{
bool has_tables = false ;
List * pubrels ;
char relation_state ;
check_publications ( wrconn , publications ) ;
check_publications_origin ( wrconn , publications , opts . copy_data ,
opts . retaindeadtuples , opts . origin ,
NULL , 0 , stmt - > subname ) ;
check_publications_origin_tables ( wrconn , publications ,
opts . copy_data ,
opts . retaindeadtuples , opts . origin ,
NULL , 0 , stmt - > subname ) ;
check_publications_origin_sequences ( wrconn , publications ,
opts . copy_data , opts . origin ,
NULL , 0 , stmt - > subname ) ;
if ( opts . retaindeadtuples )
check_pub_dead_tuple_retention ( wrconn ) ;
@ -776,25 +808,28 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* Set sync state based on if we were asked to do data copy or
* not .
*/
table _state = opts . copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ;
relation _state = opts . copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ;
/*
* Get the table list from publisher and build local table statu s
* info .
* Build local relation status info . Relations are for both table s
* and sequences from the publisher .
*/
tables = fetch_table_list ( wrconn , publications ) ;
foreach ( lc , tables )
pubrels = fetch_relation_list ( wrconn , publications ) ;
foreach_ptr ( PublicationRelKind , pubrelinfo , pubrels )
{
RangeVar * rv = ( RangeVar * ) lfirst ( lc ) ;
Oid relid ;
char relkind ;
RangeVar * rv = pubrelinfo - > rv ;
relid = RangeVarGetRelid ( rv , AccessShareLock , false ) ;
relkind = get_rel_relkind ( relid ) ;
/* Check for supported relkind. */
CheckSubscriptionRelkind ( get_rel_relkind ( relid ) ,
CheckSubscriptionRelkind ( relkind , pubrelinfo - > relkind ,
rv - > schemaname , rv - > relname ) ;
AddSubscriptionRelState ( subid , relid , table _state,
has_tables | = ( relkind ! = RELKIND_SEQUENCE ) ;
AddSubscriptionRelState ( subid , relid , relation _state,
InvalidXLogRecPtr , true ) ;
}
@ -802,6 +837,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* If requested , create permanent slot for the subscription . We
* won ' t use the initial snapshot for anything , so no need to
* export it .
*
* XXX : Similar to origins , it is not clear whether preventing the
* slot creation for empty and sequence - only subscriptions is
* worth additional complexity .
*/
if ( opts . create_slot )
{
@ -825,7 +864,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
* PENDING , to allow ALTER SUBSCRIPTION . . . REFRESH
* PUBLICATION to work .
*/
if ( opts . twophase & & ! opts . copy_data & & tables ! = NIL )
if ( opts . twophase & & ! opts . copy_data & & has_ tables)
twophase_enabled = true ;
walrcv_create_slot ( wrconn , opts . slot_name , false , twophase_enabled ,
@ -879,21 +918,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
List * validate_publications )
{
char * err ;
List * pubrel_names ;
List * pubrels = NIL ;
Oid * pubrel_local_oids ;
List * subrel_states ;
List * sub_remove_rels = NIL ;
Oid * subrel_local_oids ;
Oid * pubrel_local_oids ;
Oid * subseq_local_oids ;
int subrel_count ;
ListCell * lc ;
int off ;
int remove_rel_len ;
int subrel_count ;
int tbl_count = 0 ;
int seq_count = 0 ;
Relation rel = NULL ;
typedef struct SubRemoveRels
{
Oid relid ;
char state ;
} SubRemoveRels ;
SubRemoveRels * sub_remove_rels ;
WalReceiverConn * wrconn ;
bool must_use_password ;
@ -915,71 +957,84 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
if ( validate_publications )
check_publications ( wrconn , validate_publications ) ;
/* Get the table list from publisher. */
pubrel_name s = fetch_table _list ( wrconn , sub - > publications ) ;
/* Get the relation list from publisher. */
pubrels = fetch_relation _list ( wrconn , sub - > publications ) ;
/* Get local table list. */
subrel_states = GetSubscriptionRelations ( sub - > oid , false ) ;
/* Get local relation list. */
subrel_states = GetSubscriptionRelations ( sub - > oid , true , true , false ) ;
subrel_count = list_length ( subrel_states ) ;
/*
* Build qsorted array of local table oids for faster lookup . This can
* potentially contain all tables in the database so speed of lookup
* is important .
* Build qsorted arrays of local table oids and sequence oids for
* faster lookup . This can potentially contain all tables and
* sequences in the database so speed of lookup is important .
*
* We do not yet know the exact count of tables and sequences , so we
* allocate separate arrays for table OIDs and sequence OIDs based on
* the total number of relations ( subrel_count ) .
*/
subrel_local_oids = palloc ( subrel_count * sizeof ( Oid ) ) ;
off = 0 ;
subseq_local_oids = palloc ( subrel_count * sizeof ( Oid ) ) ;
foreach ( lc , subrel_states )
{
SubscriptionRelState * relstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
subrel_local_oids [ off + + ] = relstate - > relid ;
if ( get_rel_relkind ( relstate - > relid ) = = RELKIND_SEQUENCE )
subseq_local_oids [ seq_count + + ] = relstate - > relid ;
else
subrel_local_oids [ tbl_count + + ] = relstate - > relid ;
}
qsort ( subrel_local_oids , subrel_count ,
sizeof ( Oid ) , oid_cmp ) ;
check_publications_origin ( wrconn , sub - > publications , copy_data ,
sub - > retaindeadtuples , sub - > origin ,
subrel_local_oids , subrel_count , sub - > name ) ;
qsort ( subrel_local_oids , tbl_count , sizeof ( Oid ) , oid_cmp ) ;
check_publications_origin_tables ( wrconn , sub - > publications , copy_data ,
sub - > retaindeadtuples , sub - > origin ,
subrel_local_oids , tbl_count ,
sub - > name ) ;
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them .
*/
sub_remove_rels = palloc ( subrel_count * sizeof ( SubRemoveRels ) ) ;
qsort ( subseq_local_oids , seq_count , sizeof ( Oid ) , oid_cmp ) ;
check_publications_origin_sequences ( wrconn , sub - > publications ,
copy_data , sub - > origin ,
subseq_local_oids , seq_count ,
sub - > name ) ;
/*
* Walk over the remote table s and try to match them to locally known
* table s. If the table is not known locally create a new state for
* it .
* Walk over the remote relation s and try to match them to locally
* known relation s. If the relation is not known locally create a new
* state for it .
*
* Also builds array of local oids of remote tables for the next step .
* Also builds array of local oids of remote relations for the next
* step .
*/
off = 0 ;
pubrel_local_oids = palloc ( list_length ( pubrel_name s ) * sizeof ( Oid ) ) ;
pubrel_local_oids = palloc ( list_length ( pubrels ) * sizeof ( Oid ) ) ;
foreach ( lc , pubrel_name s )
foreach_ptr ( Pub li cationRelKind , pubrelinfo , pubrel s )
{
RangeVar * rv = ( RangeVar * ) lfirst ( lc ) ;
RangeVar * rv = pubrelinfo - > rv ;
Oid relid ;
char relkind ;
relid = RangeVarGetRelid ( rv , AccessShareLock , false ) ;
relkind = get_rel_relkind ( relid ) ;
/* Check for supported relkind. */
CheckSubscriptionRelkind ( get_rel_relkind ( relid ) ,
CheckSubscriptionRelkind ( relkind , pubrelinfo - > relkind ,
rv - > schemaname , rv - > relname ) ;
pubrel_local_oids [ off + + ] = relid ;
if ( ! bsearch ( & relid , subrel_local_oids ,
subrel_count , sizeof ( Oid ) , oid_cmp ) )
tbl_count , sizeof ( Oid ) , oid_cmp ) & &
! bsearch ( & relid , subseq_local_oids ,
seq_count , sizeof ( Oid ) , oid_cmp ) )
{
AddSubscriptionRelState ( sub - > oid , relid ,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
InvalidXLogRecPtr , true ) ;
ereport ( DEBUG1 ,
( errmsg_internal ( " table \" %s.%s \" added to subscription \" %s \" " ,
rv - > schemaname , rv - > relname , sub - > name ) ) ) ;
errmsg_internal ( " %s \" %s.%s \" added to subscription \" %s \" " ,
relkind = = RELKIND_SEQUENCE ? " sequence " : " table " ,
rv - > schemaname , rv - > relname , sub - > name ) ) ;
}
}
@ -987,19 +1042,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Next remove state for tables we should not care about anymore using
* the data we collected above
*/
qsort ( pubrel_local_oids , list_length ( pubrel_names ) ,
sizeof ( Oid ) , oid_cmp ) ;
qsort ( pubrel_local_oids , list_length ( pubrels ) , sizeof ( Oid ) , oid_cmp ) ;
remove_rel_len = 0 ;
for ( off = 0 ; off < subrel_count ; off + + )
for ( off = 0 ; off < tbl_count ; off + + )
{
Oid relid = subrel_local_oids [ off ] ;
if ( ! bsearch ( & relid , pubrel_local_oids ,
list_length ( pubrel_name s ) , sizeof ( Oid ) , oid_cmp ) )
list_length ( pubrels ) , sizeof ( Oid ) , oid_cmp ) )
{
char state ;
XLogRecPtr statelsn ;
SubRemoveRels * remove_rel = palloc ( sizeof ( SubRemoveRels ) ) ;
/*
* Lock pg_subscription_rel with AccessExclusiveLock to
@ -1021,11 +1075,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Last known rel state. */
state = GetSubscriptionRelState ( sub - > oid , relid , & statelsn ) ;
sub_remove_rels [ remove_rel_len ] . relid = relid ;
sub_remove_rels [ remove_rel_len + + ] . state = state ;
RemoveSubscriptionRel ( sub - > oid , relid ) ;
remove_rel - > relid = relid ;
remove_rel - > state = state ;
sub_remove_rels = lappend ( sub_remove_rels , remove_rel ) ;
logicalrep_worker_stop ( sub - > oid , relid ) ;
/*
@ -1064,10 +1120,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* to be at the end because otherwise if there is an error while doing
* the database operations we won ' t be able to rollback dropped slots .
*/
for ( off = 0 ; off < remove_rel_len ; off + + )
foreach_ptr ( SubRemoveRels , rel , sub_remove_rels )
{
if ( sub_remove_rels [ off ] . state ! = SUBREL_STATE_READY & &
sub_remove_rels [ off ] . state ! = SUBREL_STATE_SYNCDONE )
if ( rel - > state ! = SUBREL_STATE_READY & &
rel - > state ! = SUBREL_STATE_SYNCDONE )
{
char syncslotname [ NAMEDATALEN ] = { 0 } ;
@ -1081,11 +1137,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* dropped slots and fail . For these reasons , we allow
* missing_ok = true for the drop .
*/
ReplicationSlotNameForTablesync ( sub - > oid , sub_remove_rels [ off ] . relid ,
ReplicationSlotNameForTablesync ( sub - > oid , rel - > relid ,
syncslotname , sizeof ( syncslotname ) ) ;
ReplicationSlotDropAtPubNode ( wrconn , syncslotname , true ) ;
}
}
/*
* Next remove state for sequences we should not care about anymore
* using the data we collected above
*/
for ( off = 0 ; off < seq_count ; off + + )
{
Oid relid = subseq_local_oids [ off ] ;
if ( ! bsearch ( & relid , pubrel_local_oids ,
list_length ( pubrels ) , sizeof ( Oid ) , oid_cmp ) )
{
/*
* This locking ensures that the state of rels won ' t change
* till we are done with this refresh operation .
*/
if ( ! rel )
rel = table_open ( SubscriptionRelRelationId , AccessExclusiveLock ) ;
RemoveSubscriptionRel ( sub - > oid , relid ) ;
ereport ( DEBUG1 ,
errmsg_internal ( " sequence \" %s.%s \" removed from subscription \" %s \" " ,
get_namespace_name ( get_rel_namespace ( relid ) ) ,
get_rel_name ( relid ) ,
sub - > name ) ) ;
}
}
}
PG_FINALLY ( ) ;
{
@ -1097,6 +1181,58 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
table_close ( rel , NoLock ) ;
}
/*
* Marks all sequences with INIT state .
*/
static void
AlterSubscription_refresh_seq ( Subscription * sub )
{
char * err = NULL ;
WalReceiverConn * wrconn ;
bool must_use_password ;
/* Load the library providing us libpq calls. */
load_file ( " libpqwalreceiver " , false ) ;
/* Try to connect to the publisher. */
must_use_password = sub - > passwordrequired & & ! sub - > ownersuperuser ;
wrconn = walrcv_connect ( sub - > conninfo , true , true , must_use_password ,
sub - > name , & err ) ;
if ( ! wrconn )
ereport ( ERROR ,
errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " subscription \" %s \" could not connect to the publisher: %s " ,
sub - > name , err ) ) ;
PG_TRY ( ) ;
{
List * subrel_states ;
check_publications_origin_sequences ( wrconn , sub - > publications , true ,
sub - > origin , NULL , 0 , sub - > name ) ;
/* Get local sequence list. */
subrel_states = GetSubscriptionRelations ( sub - > oid , false , true , false ) ;
foreach_ptr ( SubscriptionRelState , subrel , subrel_states )
{
Oid relid = subrel - > relid ;
UpdateSubscriptionRelState ( sub - > oid , relid , SUBREL_STATE_INIT ,
InvalidXLogRecPtr , false ) ;
ereport ( DEBUG1 ,
errmsg_internal ( " sequence \" %s.%s \" of subscription \" %s \" set to INIT state " ,
get_namespace_name ( get_rel_namespace ( relid ) ) ,
get_rel_name ( relid ) ,
sub - > name ) ) ;
}
}
PG_FINALLY ( ) ;
{
walrcv_disconnect ( wrconn ) ;
}
PG_END_TRY ( ) ;
}
/*
* Common checks for altering failover , two_phase , and retain_dead_tuples
* options .
@ -1733,6 +1869,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break ;
}
case ALTER_SUBSCRIPTION_REFRESH_SEQUENCES :
{
if ( ! sub - > enabled )
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " %s is not allowed for disabled subscriptions " ,
" ALTER SUBSCRIPTION ... REFRESH SEQUENCES " ) ) ;
AlterSubscription_refresh_seq ( sub ) ;
break ;
}
case ALTER_SUBSCRIPTION_SKIP :
{
parse_subscription_options ( pstate , stmt - > options , SUBOPT_LSN , & opts ) ;
@ -1824,9 +1973,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if ( retain_dead_tuples )
check_pub_dead_tuple_retention ( wrconn ) ;
check_publications_origin ( wrconn , sub - > publications , false ,
retain_dead_tuples , origin , NULL , 0 ,
sub - > name ) ;
check_publications_origin_tables ( wrconn , sub - > publications , false ,
retain_dead_tuples , origin , NULL , 0 ,
sub - > name ) ;
if ( update_failover | | update_two_phase )
walrcv_alter_slot ( wrconn , sub - > slotname ,
@ -2008,7 +2157,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can ' t restart because of
* exclusive lock on the subscription .
*/
rstates = GetSubscriptionRelations ( subid , true ) ;
rstates = GetSubscriptionRelations ( subid , true , false , true ) ;
foreach ( lc , rstates )
{
SubscriptionRelState * rstate = ( SubscriptionRelState * ) lfirst ( lc ) ;
@ -2341,10 +2490,10 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* - See comments atop worker . c for more details .
*/
static void
check_publications_origin ( WalReceiverConn * wrconn , List * publications ,
bool copydata , bool retain_dead_tuples ,
char * origin , Oid * subrel_local_oids ,
int subrel_count , char * subname )
check_publications_origin_tables ( WalReceiverConn * wrconn , List * publications ,
bool copydata , bool retain_dead_tuples ,
char * origin , Oid * subrel_local_oids ,
int subrel_count , char * subname )
{
WalRcvExecResult * res ;
StringInfoData cmd ;
@ -2421,7 +2570,7 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
errmsg ( " could not receive list of replicated tables from the publisher: %s " ,
res - > err ) ) ) ;
/* Process table s. */
/* Process publication s. */
slot = MakeSingleTupleTableSlot ( res - > tupledesc , & TTSOpsMinimalTuple ) ;
while ( tuplestore_gettupleslot ( res - > tuplestore , true , false , slot ) )
{
@ -2482,6 +2631,114 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
walrcv_clear_result ( res ) ;
}
/*
* This function is similar to check_publications_origin_tables and serves
* same purpose for sequences .
*/
static void
check_publications_origin_sequences ( WalReceiverConn * wrconn , List * publications ,
bool copydata , char * origin ,
Oid * subrel_local_oids , int subrel_count ,
char * subname )
{
WalRcvExecResult * res ;
StringInfoData cmd ;
TupleTableSlot * slot ;
Oid tableRow [ 1 ] = { TEXTOID } ;
List * publist = NIL ;
/*
* Enable sequence synchronization checks only when origin is ' none ' , to
* ensure that sequence data from other origins is not inadvertently
* copied .
*/
if ( ! copydata | | pg_strcasecmp ( origin , LOGICALREP_ORIGIN_NONE ) ! = 0 )
return ;
initStringInfo ( & cmd ) ;
appendStringInfoString ( & cmd ,
" SELECT DISTINCT P.pubname AS pubname \n "
" FROM pg_publication P, \n "
" LATERAL pg_get_publication_sequences(P.pubname) GPS \n "
" JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid), \n "
" pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace) \n "
" WHERE C.oid = GPS.relid AND P.pubname IN ( " ) ;
GetPublicationsStr ( publications , & cmd , true ) ;
appendStringInfoString ( & cmd , " ) \n " ) ;
/*
* In case of ALTER SUBSCRIPTION . . . REFRESH PUBLICATION ,
* subrel_local_oids contains the list of relations that are already
* present on the subscriber . This check should be skipped as these will
* not be re - synced .
*/
for ( int i = 0 ; i < subrel_count ; i + + )
{
Oid relid = subrel_local_oids [ i ] ;
char * schemaname = get_namespace_name ( get_rel_namespace ( relid ) ) ;
char * seqname = get_rel_name ( relid ) ;
appendStringInfo ( & cmd ,
" AND NOT (N.nspname = '%s' AND C.relname = '%s') \n " ,
schemaname , seqname ) ;
}
res = walrcv_exec ( wrconn , cmd . data , 1 , tableRow ) ;
pfree ( cmd . data ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not receive list of replicated sequences from the publisher: %s " ,
res - > err ) ) ) ;
/* Process publications. */
slot = MakeSingleTupleTableSlot ( res - > tupledesc , & TTSOpsMinimalTuple ) ;
while ( tuplestore_gettupleslot ( res - > tuplestore , true , false , slot ) )
{
char * pubname ;
bool isnull ;
pubname = TextDatumGetCString ( slot_getattr ( slot , 1 , & isnull ) ) ;
Assert ( ! isnull ) ;
ExecClearTuple ( slot ) ;
publist = list_append_unique ( publist , makeString ( pubname ) ) ;
}
/*
* Log a warning if the publisher has subscribed to the same sequence from
* some other publisher . We cannot know the origin of sequences data
* during the initial sync .
*/
if ( publist )
{
StringInfo pubnames = makeStringInfo ( ) ;
StringInfo err_msg = makeStringInfo ( ) ;
StringInfo err_hint = makeStringInfo ( ) ;
/* Prepare the list of publication(s) for warning message. */
GetPublicationsStr ( publist , pubnames , false ) ;
appendStringInfo ( err_msg , _ ( " subscription \" %s \" requested copy_data with origin = NONE but might copy data that had a different origin " ) ,
subname ) ;
appendStringInfoString ( err_hint , _ ( " Verify that initial data copied from the publisher sequences did not come from other origins. " ) ) ;
ereport ( WARNING ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg_internal ( " %s " , err_msg - > data ) ,
errdetail_plural ( " The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions. " ,
" The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions. " ,
list_length ( publist ) , pubnames - > data ) ,
errhint_internal ( " %s " , err_hint - > data ) ) ;
}
ExecDropSingleTupleTableSlot ( slot ) ;
walrcv_clear_result ( res ) ;
}
/*
* Determine whether the retain_dead_tuples can be enabled based on the
* publisher ' s status .
@ -2594,8 +2851,23 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
}
/*
* Get the list of tables which belong to specified publications on the
* publisher connection .
* Return true iff ' rv ' is a member of the list .
*/
static bool
list_member_rangevar ( const List * list , RangeVar * rv )
{
foreach_ptr ( PublicationRelKind , relinfo , list )
{
if ( equal ( relinfo - > rv , rv ) )
return true ;
}
return false ;
}
/*
* Get the list of tables and sequences which belong to specified publications
* on the publisher connection .
*
* Note that we don ' t support the case where the column list is different for
* the same table in different publications to avoid sending unwanted column
@ -2603,15 +2875,16 @@ CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
* list and row filter are specified for different publications .
*/
static List *
fetch_table _list ( WalReceiverConn * wrconn , List * publications )
fetch_relation _list ( WalReceiverConn * wrconn , List * publications )
{
WalRcvExecResult * res ;
StringInfoData cmd ;
TupleTableSlot * slot ;
Oid tableRow [ 3 ] = { TEXTOID , TEXTOID , InvalidOid } ;
List * table list = NIL ;
Oid tableRow [ 4 ] = { TEXTOID , TEXTOID , CHAR OID , InvalidOid } ;
List * relation list = NIL ;
int server_version = walrcv_server_version ( wrconn ) ;
bool check_columnlist = ( server_version > = 150000 ) ;
int column_count = check_columnlist ? 4 : 3 ;
StringInfo pub_names = makeStringInfo ( ) ;
initStringInfo ( & cmd ) ;
@ -2619,10 +2892,10 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
/* Build the pub_names comma-separated string. */
GetPublicationsStr ( publications , pub_names , true ) ;
/* Get the list of tables from the publisher. */
/* Get the list of relations from the publisher */
if ( server_version > = 160000 )
{
tableRow [ 2 ] = INT2VECTOROID ;
tableRow [ 3 ] = INT2VECTOROID ;
/*
* From version 16 , we allowed passing multiple publications to the
@ -2637,19 +2910,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
* to worry if different publications have specified them in a
* different order . See pub_collist_validate .
*/
appendStringInfo ( & cmd , " SELECT DISTINCT n.nspname, c.relname, gpt.attrs \n "
" FROM pg_class c \n "
appendStringInfo ( & cmd , " SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs \n "
" FROM pg_class c \n "
" JOIN pg_namespace n ON n.oid = c.relnamespace \n "
" JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).* \n "
" FROM pg_publication \n "
" WHERE pubname IN ( %s )) AS gpt \n "
" ON gpt.relid = c.oid \n " ,
pub_names - > data ) ;
/* From version 19, inclusion of sequences in the target is supported */
if ( server_version > = 190000 )
appendStringInfo ( & cmd ,
" UNION ALL \n "
" SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2 ( RELKIND_SEQUENCE ) " :: \" char \" AS relkind, NULL::int2vector AS attrs \n "
" FROM pg_catalog.pg_publication_sequences s \n "
" WHERE s.pubname IN ( %s ) " ,
pub_names - > data ) ;
}
else
{
tableRow [ 2 ] = NAMEARRAYOID ;
appendStringInfoString ( & cmd , " SELECT DISTINCT t.schemaname, t.tablename \n " ) ;
tableRow [ 3 ] = NAMEARRAYOID ;
appendStringInfoString ( & cmd , " SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2 ( RELKIND_RELATION ) " :: \" char \" AS relkind \n " ) ;
/* Get column lists for each relation if the publisher supports it */
if ( check_columnlist )
@ -2662,7 +2944,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
destroyStringInfo ( pub_names ) ;
res = walrcv_exec ( wrconn , cmd . data , check_columnlist ? 3 : 2 , tableRow ) ;
res = walrcv_exec ( wrconn , cmd . data , column_count , tableRow ) ;
pfree ( cmd . data ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
@ -2678,22 +2960,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
char * nspname ;
char * relname ;
bool isnull ;
RangeVar * rv ;
char relkind ;
PublicationRelKind * relinfo = palloc_object ( PublicationRelKind ) ;
nspname = TextDatumGetCString ( slot_getattr ( slot , 1 , & isnull ) ) ;
Assert ( ! isnull ) ;
relname = TextDatumGetCString ( slot_getattr ( slot , 2 , & isnull ) ) ;
Assert ( ! isnull ) ;
relkind = DatumGetChar ( slot_getattr ( slot , 3 , & isnull ) ) ;
Assert ( ! isnull ) ;
rv = makeRangeVar ( nspname , relname , - 1 ) ;
relinfo - > rv = makeRangeVar ( nspname , relname , - 1 ) ;
relinfo - > relkind = relkind ;
if ( check_columnlist & & list_member ( tablelist , rv ) )
if ( relkind ! = RELKIND_SEQUENCE & &
check_columnlist & &
list_member_rangevar ( relationlist , relinfo - > rv ) )
ereport ( ERROR ,
errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot use different column lists for table \" %s.%s \" in different publications " ,
nspname , relname ) ) ;
else
table list = lappend ( tablelist , rv ) ;
relation list = lappend ( relationlist , relinfo ) ;
ExecClearTuple ( slot ) ;
}
@ -2701,7 +2989,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
walrcv_clear_result ( res ) ;
return table list;
return relation list;
}
/*