@ -45,6 +45,14 @@
# include "utils/rel.h"
# include "utils/syscache.h"
/* Records association between publication and published table */
typedef struct
{
Oid relid ; /* OID of published table */
Oid pubid ; /* OID of publication that publishes this
* table . */
} published_rel ;
static void publication_translate_columns ( Relation targetrel , List * columns ,
int * natts , AttrNumber * * attrs ) ;
@ -172,42 +180,57 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
}
/*
* Filter out the partitions whose parent tables were also specified in
* the publication .
* Returns true if the ancestor is in the list of published relations .
* Otherwise , returns false .
*/
static List *
filter_partitions ( List * relids )
static bool
is_ancestor_member_tableinfos ( Oid ancestor , List * table_infos )
{
ListCell * lc ;
foreach ( lc , table_infos )
{
Oid relid = ( ( published_rel * ) lfirst ( lc ) ) - > relid ;
if ( relid = = ancestor )
return true ;
}
return false ;
}
/*
* Filter out the partitions whose parent tables are also present in the list .
*/
static void
filter_partitions ( List * table_infos )
{
List * result = NIL ;
ListCell * lc ;
ListCell * lc2 ;
foreach ( lc , relids )
foreach ( lc , table_info s)
{
bool skip = false ;
List * ancestors = NIL ;
Oid relid = lfirst_oid ( lc ) ;
ListCell * lc2 ;
published_rel * table_info = ( published_rel * ) lfirst ( lc ) ;
if ( get_rel_relispartition ( relid ) )
ancestors = get_partition_ancestors ( relid ) ;
if ( get_rel_relispartition ( table_info - > relid ) )
ancestors = get_partition_ancestors ( table_info - > relid ) ;
foreach ( lc2 , ancestors )
{
Oid ancestor = lfirst_oid ( lc2 ) ;
/* Check if the parent table exists in the published table list. */
if ( list_member_oid ( relids , ancestor ) )
if ( is_ancestor_member_tableinfos ( ancestor , table_infos ) )
{
skip = true ;
break ;
}
}
if ( ! skip )
result = lappend_oid ( result , relid ) ;
if ( skip )
table_infos = foreach_delete_current ( table_infos , lc ) ;
}
return result ;
}
/*
@ -1026,22 +1049,27 @@ GetPublicationByName(const char *pubname, bool missing_ok)
}
/*
* Returns information of tables in a publication .
* Get information of the tables in the given publication array .
*
* Returns pubid , relid , column list , row filter for each table .
*/
Datum
pg_get_publication_tables ( PG_FUNCTION_ARGS )
{
# define NUM_PUBLICATION_TABLES_ELEM 3
# define NUM_PUBLICATION_TABLES_ELEM 4
FuncCallContext * funcctx ;
char * pubname = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) ) ;
Publication * publication ;
List * tables ;
List * table_infos = NIL ;
/* stuff done only on the first call of the function */
if ( SRF_IS_FIRSTCALL ( ) )
{
TupleDesc tupdesc ;
MemoryContext oldcontext ;
ArrayType * arr ;
Datum * elems ;
int nelems ,
i ;
bool viaroot = false ;
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT ( ) ;
@ -1049,68 +1077,108 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo ( funcctx - > multi_call_memory_ctx ) ;
publication = GetPublicationByName ( pubname , false ) ;
/*
* Publications support partitioned tables , although all changes are
* replicated using leaf partition identity and schema , so we only
* need those .
* Deconstruct the parameter into elements where each element is a
* publication name .
*/
if ( publication - > alltables )
{
tables = GetAllTablesPublicationRelations ( publication - > pubviaroot ) ;
}
else
arr = PG_GETARG_ARRAYTYPE_P ( 0 ) ;
deconstruct_array ( arr , TEXTOID , - 1 , false , TYPALIGN_INT ,
& elems , NULL , & nelems ) ;
/* Get Oids of tables from each publication. */
for ( i = 0 ; i < nelems ; i + + )
{
List * relids ,
* schemarelids ;
relids = GetPublicationRelations ( publication - > oid ,
publication - > pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF ) ;
schemarelids = GetAllSchemaPublicationRelations ( publication - > oid ,
publication - > pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF ) ;
tables = list_concat_unique_oid ( relids , schemarelids ) ;
Publication * pub_elem ;
List * pub_elem_tables = NIL ;
ListCell * lc ;
pub_elem = GetPublicationByName ( TextDatumGetCString ( elems [ i ] ) , false ) ;
/*
* If the publication publishes partition changes via their
* respective root partitioned tables , we must exclude partitions
* in favor of including the root partitioned tables . Otherwise ,
* the function could return both the child and parent tables
* which could cause data of the child table to be
* double - published on the subscriber side .
* Publications support partitioned tables . If
* publish_via_partition_root is false , all changes are replicated
* using leaf partition identity and schema , so we only need
* those . Otherwise , get the partitioned table itself .
*/
if ( publication - > pubviaroot )
tables = filter_partitions ( tables ) ;
if ( pub_elem - > alltables )
pub_elem_tables = GetAllTablesPublicationRelations ( pub_elem - > pubviaroot ) ;
else
{
List * relids ,
* schemarelids ;
relids = GetPublicationRelations ( pub_elem - > oid ,
pub_elem - > pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF ) ;
schemarelids = GetAllSchemaPublicationRelations ( pub_elem - > oid ,
pub_elem - > pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF ) ;
pub_elem_tables = list_concat_unique_oid ( relids , schemarelids ) ;
}
/*
* Record the published table and the corresponding publication so
* that we can get row filters and column lists later .
*
* When a table is published by multiple publications , to obtain
* all row filters and column lists , the structure related to this
* table will be recorded multiple times .
*/
foreach ( lc , pub_elem_tables )
{
published_rel * table_info = ( published_rel * ) palloc ( sizeof ( published_rel ) ) ;
table_info - > relid = lfirst_oid ( lc ) ;
table_info - > pubid = pub_elem - > oid ;
table_infos = lappend ( table_infos , table_info ) ;
}
/* At least one publication is using publish_via_partition_root. */
if ( pub_elem - > pubviaroot )
viaroot = true ;
}
/*
* If the publication publishes partition changes via their respective
* root partitioned tables , we must exclude partitions in favor of
* including the root partitioned tables . Otherwise , the function
* could return both the child and parent tables which could cause
* data of the child table to be double - published on the subscriber
* side .
*/
if ( viaroot )
filter_partitions ( table_infos ) ;
/* Construct a tuple descriptor for the result rows. */
tupdesc = CreateTemplateTupleDesc ( NUM_PUBLICATION_TABLES_ELEM ) ;
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 1 , " relid " ,
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 1 , " pub id" ,
OIDOID , - 1 , 0 ) ;
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 2 , " attrs " ,
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 2 , " relid " ,
OIDOID , - 1 , 0 ) ;
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 3 , " attrs " ,
INT2VECTOROID , - 1 , 0 ) ;
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 3 , " qual " ,
TupleDescInitEntry ( tupdesc , ( AttrNumber ) 4 , " qual " ,
PG_NODE_TREEOID , - 1 , 0 ) ;
funcctx - > tuple_desc = BlessTupleDesc ( tupdesc ) ;
funcctx - > user_fctx = ( void * ) tables ;
funcctx - > user_fctx = ( void * ) table_info s ;
MemoryContextSwitchTo ( oldcontext ) ;
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP ( ) ;
tables = ( List * ) funcctx - > user_fctx ;
table_info s = ( List * ) funcctx - > user_fctx ;
if ( funcctx - > call_cntr < list_length ( tables ) )
if ( funcctx - > call_cntr < list_length ( table_info s ) )
{
HeapTuple pubtuple = NULL ;
HeapTuple rettuple ;
Oid relid = list_nth_oid ( tables , funcctx - > call_cntr ) ;
Publication * pub ;
published_rel * table_info = ( published_rel * ) list_nth ( table_infos , funcctx - > call_cntr ) ;
Oid relid = table_info - > relid ;
Oid schemaid = get_rel_namespace ( relid ) ;
Datum values [ NUM_PUBLICATION_TABLES_ELEM ] = { 0 } ;
bool nulls [ NUM_PUBLICATION_TABLES_ELEM ] = { 0 } ;
@ -1119,42 +1187,43 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
* Form tuple with appropriate data .
*/
publication = GetPublicationByName ( pubname , false ) ;
pub = GetPublication ( table_info - > pubid ) ;
values [ 0 ] = ObjectIdGetDatum ( relid ) ;
values [ 0 ] = ObjectIdGetDatum ( pub - > oid ) ;
values [ 1 ] = ObjectIdGetDatum ( relid ) ;
/*
* We don ' t consider row filters or column lists for FOR ALL TABLES or
* FOR TABLES IN SCHEMA publications .
*/
if ( ! publication - > alltables & &
if ( ! pub - > alltables & &
! SearchSysCacheExists2 ( PUBLICATIONNAMESPACEMAP ,
ObjectIdGetDatum ( schemaid ) ,
ObjectIdGetDatum ( publication - > oid ) ) )
ObjectIdGetDatum ( pub - > oid ) ) )
pubtuple = SearchSysCacheCopy2 ( PUBLICATIONRELMAP ,
ObjectIdGetDatum ( relid ) ,
ObjectIdGetDatum ( publication - > oid ) ) ;
ObjectIdGetDatum ( pub - > oid ) ) ;
if ( HeapTupleIsValid ( pubtuple ) )
{
/* Lookup the column list attribute. */
values [ 1 ] = SysCacheGetAttr ( PUBLICATIONRELMAP , pubtuple ,
values [ 2 ] = SysCacheGetAttr ( PUBLICATIONRELMAP , pubtuple ,
Anum_pg_publication_rel_prattrs ,
& ( nulls [ 1 ] ) ) ;
& ( nulls [ 2 ] ) ) ;
/* Null indicates no filter. */
values [ 2 ] = SysCacheGetAttr ( PUBLICATIONRELMAP , pubtuple ,
values [ 3 ] = SysCacheGetAttr ( PUBLICATIONRELMAP , pubtuple ,
Anum_pg_publication_rel_prqual ,
& ( nulls [ 2 ] ) ) ;
& ( nulls [ 3 ] ) ) ;
}
else
{
nulls [ 1 ] = true ;
nulls [ 2 ] = true ;
nulls [ 3 ] = true ;
}
/* Show all columns when the column list is not specified. */
if ( nulls [ 1 ] = = true )
if ( nulls [ 2 ] )
{
Relation rel = table_open ( relid , AccessShareLock ) ;
int nattnums = 0 ;
@ -1176,8 +1245,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
if ( nattnums > 0 )
{
values [ 1 ] = PointerGetDatum ( buildint2vector ( attnums , nattnums ) ) ;
nulls [ 1 ] = false ;
values [ 2 ] = PointerGetDatum ( buildint2vector ( attnums , nattnums ) ) ;
nulls [ 2 ] = false ;
}
table_close ( rel , AccessShareLock ) ;