@ -16,6 +16,7 @@
# include "access/genam.h"
# include "access/genam.h"
# include "access/htup_details.h"
# include "access/htup_details.h"
# include "access/relation.h"
# include "access/table.h"
# include "access/table.h"
# include "access/xact.h"
# include "access/xact.h"
# include "catalog/catalog.h"
# include "catalog/catalog.h"
@ -67,15 +68,17 @@ typedef struct rf_context
} rf_context ;
} rf_context ;
static List * OpenRelIdList ( List * relids ) ;
static List * OpenRelIdList ( List * relids ) ;
static List * OpenTable List ( List * tables ) ;
static List * OpenRelation List ( List * rels , char objectType ) ;
static void CloseTable List ( List * rels ) ;
static void CloseRelation List ( List * rels ) ;
static void LockSchemaList ( List * schemalist ) ;
static void LockSchemaList ( List * schemalist ) ;
static void PublicationAddTable s ( Oid pubid , List * rels , bool if_not_exists ,
static void PublicationAddRelation s ( Oid pubid , List * rels , bool if_not_exists ,
AlterPublicationStmt * stmt ) ;
AlterPublicationStmt * stmt ) ;
static void PublicationDropTables ( Oid pubid , List * rels , bool missing_ok ) ;
static void PublicationDropRelations ( Oid pubid , List * rels , bool missing_ok ) ;
static void PublicationAddSchemas ( Oid pubid , List * schemas , bool if_not_exists ,
static void PublicationAddSchemas ( Oid pubid , List * schemas , char objectType ,
AlterPublicationStmt * stmt ) ;
bool if_not_exists , AlterPublicationStmt * stmt ) ;
static void PublicationDropSchemas ( Oid pubid , List * schemas , bool missing_ok ) ;
static void PublicationDropSchemas ( Oid pubid , List * schemas , char objectType ,
bool missing_ok ) ;
static void
static void
parse_publication_options ( ParseState * pstate ,
parse_publication_options ( ParseState * pstate ,
@ -95,6 +98,7 @@ parse_publication_options(ParseState *pstate,
pubactions - > pubupdate = true ;
pubactions - > pubupdate = true ;
pubactions - > pubdelete = true ;
pubactions - > pubdelete = true ;
pubactions - > pubtruncate = true ;
pubactions - > pubtruncate = true ;
pubactions - > pubsequence = true ;
* publish_via_partition_root = false ;
* publish_via_partition_root = false ;
/* Parse options */
/* Parse options */
@ -119,6 +123,7 @@ parse_publication_options(ParseState *pstate,
pubactions - > pubupdate = false ;
pubactions - > pubupdate = false ;
pubactions - > pubdelete = false ;
pubactions - > pubdelete = false ;
pubactions - > pubtruncate = false ;
pubactions - > pubtruncate = false ;
pubactions - > pubsequence = false ;
* publish_given = true ;
* publish_given = true ;
publish = defGetString ( defel ) ;
publish = defGetString ( defel ) ;
@ -141,6 +146,8 @@ parse_publication_options(ParseState *pstate,
pubactions - > pubdelete = true ;
pubactions - > pubdelete = true ;
else if ( strcmp ( publish_opt , " truncate " ) = = 0 )
else if ( strcmp ( publish_opt , " truncate " ) = = 0 )
pubactions - > pubtruncate = true ;
pubactions - > pubtruncate = true ;
else if ( strcmp ( publish_opt , " sequence " ) = = 0 )
pubactions - > pubsequence = true ;
else
else
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
@ -167,7 +174,9 @@ parse_publication_options(ParseState *pstate,
*/
*/
static void
static void
ObjectsInPublicationToOids ( List * pubobjspec_list , ParseState * pstate ,
ObjectsInPublicationToOids ( List * pubobjspec_list , ParseState * pstate ,
List * * rels , List * * schemas )
List * * tables , List * * sequences ,
List * * tables_schemas , List * * sequences_schemas ,
List * * schemas )
{
{
ListCell * cell ;
ListCell * cell ;
PublicationObjSpec * pubobj ;
PublicationObjSpec * pubobj ;
@ -185,12 +194,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
switch ( pubobj - > pubobjtype )
switch ( pubobj - > pubobjtype )
{
{
case PUBLICATIONOBJ_TABLE :
case PUBLICATIONOBJ_TABLE :
* rels = lappend ( * rels , pubobj - > pubtable ) ;
* tables = lappend ( * tables , pubobj - > pubtable ) ;
break ;
case PUBLICATIONOBJ_SEQUENCE :
* sequences = lappend ( * sequences , pubobj - > pubtable ) ;
break ;
break ;
case PUBLICATIONOBJ_TABLES_IN_SCHEMA :
case PUBLICATIONOBJ_TABLES_IN_SCHEMA :
schemaid = get_namespace_oid ( pubobj - > name , false ) ;
schemaid = get_namespace_oid ( pubobj - > name , false ) ;
/* Filter out duplicates if user specifies "sch1, sch1" */
/* Filter out duplicates if user specifies "sch1, sch1" */
* tables_schemas = list_append_unique_oid ( * tables_schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
break ;
case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA :
schemaid = get_namespace_oid ( pubobj - > name , false ) ;
/* Filter out duplicates if user specifies "sch1, sch1" */
* sequences_schemas = list_append_unique_oid ( * sequences_schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
break ;
break ;
case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA :
case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA :
@ -204,6 +224,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
list_free ( search_path ) ;
list_free ( search_path ) ;
/* Filter out duplicates if user specifies "sch1, sch1" */
/* Filter out duplicates if user specifies "sch1, sch1" */
* tables_schemas = list_append_unique_oid ( * tables_schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
break ;
case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA :
search_path = fetch_search_path ( false ) ;
if ( search_path = = NIL ) /* nothing valid in search_path? */
ereport ( ERROR ,
errcode ( ERRCODE_UNDEFINED_SCHEMA ) ,
errmsg ( " no schema has been selected for CURRENT_SCHEMA " ) ) ;
schemaid = linitial_oid ( search_path ) ;
list_free ( search_path ) ;
/* Filter out duplicates if user specifies "sch1, sch1" */
* sequences_schemas = list_append_unique_oid ( * sequences_schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
* schemas = list_append_unique_oid ( * schemas , schemaid ) ;
break ;
break ;
default :
default :
@ -240,6 +275,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
errdetail ( " Table \" %s \" in schema \" %s \" is already part of the publication, adding the same schema is not supported. " ,
errdetail ( " Table \" %s \" in schema \" %s \" is already part of the publication, adding the same schema is not supported. " ,
RelationGetRelationName ( rel ) ,
RelationGetRelationName ( rel ) ,
get_namespace_name ( relSchemaId ) ) ) ;
get_namespace_name ( relSchemaId ) ) ) ;
else if ( checkobjtype = = PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA )
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot add schema \" %s \" to publication " ,
get_namespace_name ( relSchemaId ) ) ,
errdetail ( " Sequence \" %s \" in schema \" %s \" is already part of the publication, adding the same schema is not supported. " ,
RelationGetRelationName ( rel ) ,
get_namespace_name ( relSchemaId ) ) ) ;
else if ( checkobjtype = = PUBLICATIONOBJ_TABLE )
else if ( checkobjtype = = PUBLICATIONOBJ_TABLE )
ereport ( ERROR ,
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
@ -248,6 +291,14 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
RelationGetRelationName ( rel ) ) ,
RelationGetRelationName ( rel ) ) ,
errdetail ( " Table's schema \" %s \" is already part of the publication or part of the specified schema list. " ,
errdetail ( " Table's schema \" %s \" is already part of the publication or part of the specified schema list. " ,
get_namespace_name ( relSchemaId ) ) ) ;
get_namespace_name ( relSchemaId ) ) ) ;
else if ( checkobjtype = = PUBLICATIONOBJ_SEQUENCE )
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot add relation \" %s.%s \" to publication " ,
get_namespace_name ( relSchemaId ) ,
RelationGetRelationName ( rel ) ) ,
errdetail ( " Sequence's schema \" %s \" is already part of the publication or part of the specified schema list. " ,
get_namespace_name ( relSchemaId ) ) ) ;
}
}
}
}
}
}
@ -615,6 +666,7 @@ TransformPubWhereClauses(List *tables, const char *queryString,
ObjectAddress
ObjectAddress
CreatePublication ( ParseState * pstate , CreatePublicationStmt * stmt )
CreatePublication ( ParseState * pstate , CreatePublicationStmt * stmt )
{
{
ListCell * lc ;
Relation rel ;
Relation rel ;
ObjectAddress myself ;
ObjectAddress myself ;
Oid puboid ;
Oid puboid ;
@ -626,9 +678,25 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root_given ;
bool publish_via_partition_root_given ;
bool publish_via_partition_root ;
bool publish_via_partition_root ;
AclResult aclresult ;
AclResult aclresult ;
List * relations = NIL ;
List * tables = NIL ;
List * sequences = NIL ;
List * tables_schemaidlist = NIL ;
List * sequences_schemaidlist = NIL ;
List * schemaidlist = NIL ;
List * schemaidlist = NIL ;
bool for_all_tables = false ;
bool for_all_sequences = false ;
/* Translate the list of object types (represented by strings) to bool flags. */
foreach ( lc , stmt - > for_all_objects )
{
char * val = strVal ( lfirst ( lc ) ) ;
if ( strcmp ( val , " tables " ) = = 0 )
for_all_tables = true ;
else if ( strcmp ( val , " sequences " ) = = 0 )
for_all_sequences = true ;
}
/* must have CREATE privilege on database */
/* must have CREATE privilege on database */
aclresult = pg_database_aclcheck ( MyDatabaseId , GetUserId ( ) , ACL_CREATE ) ;
aclresult = pg_database_aclcheck ( MyDatabaseId , GetUserId ( ) , ACL_CREATE ) ;
if ( aclresult ! = ACLCHECK_OK )
if ( aclresult ! = ACLCHECK_OK )
@ -636,7 +704,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
get_database_name ( MyDatabaseId ) ) ;
get_database_name ( MyDatabaseId ) ) ;
/* FOR ALL TABLES requires superuser */
/* FOR ALL TABLES requires superuser */
if ( stmt - > for_all_tables & & ! superuser ( ) )
if ( for_all_tables & & ! superuser ( ) )
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errmsg ( " must be superuser to create FOR ALL TABLES publication " ) ) ) ;
errmsg ( " must be superuser to create FOR ALL TABLES publication " ) ) ) ;
@ -672,7 +740,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
Anum_pg_publication_oid ) ;
Anum_pg_publication_oid ) ;
values [ Anum_pg_publication_oid - 1 ] = ObjectIdGetDatum ( puboid ) ;
values [ Anum_pg_publication_oid - 1 ] = ObjectIdGetDatum ( puboid ) ;
values [ Anum_pg_publication_puballtables - 1 ] =
values [ Anum_pg_publication_puballtables - 1 ] =
BoolGetDatum ( stmt - > for_all_tables ) ;
BoolGetDatum ( for_all_tables ) ;
values [ Anum_pg_publication_puballsequences - 1 ] =
BoolGetDatum ( for_all_sequences ) ;
values [ Anum_pg_publication_pubinsert - 1 ] =
values [ Anum_pg_publication_pubinsert - 1 ] =
BoolGetDatum ( pubactions . pubinsert ) ;
BoolGetDatum ( pubactions . pubinsert ) ;
values [ Anum_pg_publication_pubupdate - 1 ] =
values [ Anum_pg_publication_pubupdate - 1 ] =
@ -681,6 +751,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum ( pubactions . pubdelete ) ;
BoolGetDatum ( pubactions . pubdelete ) ;
values [ Anum_pg_publication_pubtruncate - 1 ] =
values [ Anum_pg_publication_pubtruncate - 1 ] =
BoolGetDatum ( pubactions . pubtruncate ) ;
BoolGetDatum ( pubactions . pubtruncate ) ;
values [ Anum_pg_publication_pubsequence - 1 ] =
BoolGetDatum ( pubactions . pubsequence ) ;
values [ Anum_pg_publication_pubviaroot - 1 ] =
values [ Anum_pg_publication_pubviaroot - 1 ] =
BoolGetDatum ( publish_via_partition_root ) ;
BoolGetDatum ( publish_via_partition_root ) ;
@ -698,45 +770,88 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
CommandCounterIncrement ( ) ;
CommandCounterIncrement ( ) ;
/* Associate objects with the publication. */
/* Associate objects with the publication. */
if ( stmt - > for_all_tabl es)
if ( for_all_tables | | for_all_sequenc es)
{
{
/* Invalidate relcache so that publication info is rebuilt. */
/* Invalidate relcache so that publication info is rebuilt. */
CacheInvalidateRelcacheAll ( ) ;
CacheInvalidateRelcacheAll ( ) ;
}
}
else
/*
* If the publication might have either tables or sequences ( directly or
* through a schema ) , process that .
*/
if ( ! for_all_tables | | ! for_all_sequences )
{
{
ObjectsInPublicationToOids ( stmt - > pubobjects , pstate , & relations ,
ObjectsInPublicationToOids ( stmt - > pubobjects , pstate ,
& tables , & sequences ,
& tables_schemaidlist ,
& sequences_schemaidlist ,
& schemaidlist ) ;
& schemaidlist ) ;
/* FOR ALL TABLES IN SCHEMA requires superuser */
/* FOR ALL TABLES IN SCHEMA requires superuser */
if ( list_length ( schemaidlist ) > 0 & & ! superuser ( ) )
if ( list_length ( tables_ schemaidlist) > 0 & & ! superuser ( ) )
ereport ( ERROR ,
ereport ( ERROR ,
errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errmsg ( " must be superuser to create FOR ALL TABLES IN SCHEMA publication " ) ) ;
errmsg ( " must be superuser to create FOR ALL TABLES IN SCHEMA publication " ) ) ;
if ( list_length ( relations ) > 0 )
/* FOR ALL SEQUENCES IN SCHEMA requires superuser */
if ( list_length ( sequences_schemaidlist ) > 0 & & ! superuser ( ) )
ereport ( ERROR ,
errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errmsg ( " must be superuser to create FOR ALL SEQUENCES IN SCHEMA publication " ) ) ;
/* tables added directly */
if ( list_length ( tables ) > 0 )
{
{
List * rels ;
List * rels ;
rels = OpenTableList ( relations ) ;
rels = OpenRelationList ( tables , PUB_OBJTYPE_TABLE ) ;
CheckObjSchemaNotAlreadyInPublication ( rels , schemaidlist ,
CheckObjSchemaNotAlreadyInPublication ( rels , tables_ schemaidlist,
PUBLICATIONOBJ_TABLE ) ;
PUBLICATIONOBJ_TABLE ) ;
TransformPubWhereClauses ( rels , pstate - > p_sourcetext ,
TransformPubWhereClauses ( rels , pstate - > p_sourcetext ,
publish_via_partition_root ) ;
publish_via_partition_root ) ;
PublicationAddTables ( puboid , rels , true , NULL ) ;
PublicationAddRelations ( puboid , rels , true , NULL ) ;
CloseTableList ( rels ) ;
CloseRelationList ( rels ) ;
}
/* sequences added directly */
if ( list_length ( sequences ) > 0 )
{
List * rels ;
rels = OpenRelationList ( sequences , PUB_OBJTYPE_SEQUENCE ) ;
CheckObjSchemaNotAlreadyInPublication ( rels , sequences_schemaidlist ,
PUBLICATIONOBJ_SEQUENCE ) ;
PublicationAddRelations ( puboid , rels , true , NULL ) ;
CloseRelationList ( rels ) ;
}
/* tables added through a schema */
if ( list_length ( tables_schemaidlist ) > 0 )
{
/*
* Schema lock is held until the publication is created to prevent
* concurrent schema deletion .
*/
LockSchemaList ( tables_schemaidlist ) ;
PublicationAddSchemas ( puboid ,
tables_schemaidlist , PUB_OBJTYPE_TABLE ,
true , NULL ) ;
}
}
if ( list_length ( schemaidlist ) > 0 )
/* sequences added through a schema */
if ( list_length ( sequences_schemaidlist ) > 0 )
{
{
/*
/*
* Schema lock is held until the publication is created to prevent
* Schema lock is held until the publication is created to prevent
* concurrent schema deletion .
* concurrent schema deletion .
*/
*/
LockSchemaList ( schemaidlist ) ;
LockSchemaList ( sequences_schemaidlist ) ;
PublicationAddSchemas ( puboid , schemaidlist , true , NULL ) ;
PublicationAddSchemas ( puboid ,
sequences_schemaidlist , PUB_OBJTYPE_SEQUENCE ,
true , NULL ) ;
}
}
}
}
@ -799,6 +914,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
AccessShareLock ) ;
AccessShareLock ) ;
root_relids = GetPublicationRelations ( pubform - > oid ,
root_relids = GetPublicationRelations ( pubform - > oid ,
PUB_OBJTYPE_TABLE ,
PUBLICATION_PART_ROOT ) ;
PUBLICATION_PART_ROOT ) ;
foreach ( lc , root_relids )
foreach ( lc , root_relids )
@ -857,6 +973,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
values [ Anum_pg_publication_pubtruncate - 1 ] = BoolGetDatum ( pubactions . pubtruncate ) ;
values [ Anum_pg_publication_pubtruncate - 1 ] = BoolGetDatum ( pubactions . pubtruncate ) ;
replaces [ Anum_pg_publication_pubtruncate - 1 ] = true ;
replaces [ Anum_pg_publication_pubtruncate - 1 ] = true ;
values [ Anum_pg_publication_pubsequence - 1 ] = BoolGetDatum ( pubactions . pubsequence ) ;
replaces [ Anum_pg_publication_pubsequence - 1 ] = true ;
}
}
if ( publish_via_partition_root_given )
if ( publish_via_partition_root_given )
@ -876,7 +995,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
/* Invalidate the relcache. */
/* Invalidate the relcache. */
if ( pubform - > puballtables )
if ( pubform - > puballtables | | pubform - > puballsequences )
{
{
CacheInvalidateRelcacheAll ( ) ;
CacheInvalidateRelcacheAll ( ) ;
}
}
@ -892,6 +1011,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
*/
*/
if ( root_relids = = NIL )
if ( root_relids = = NIL )
relids = GetPublicationRelations ( pubform - > oid ,
relids = GetPublicationRelations ( pubform - > oid ,
PUB_OBJTYPE_TABLE ,
PUBLICATION_PART_ALL ) ;
PUBLICATION_PART_ALL ) ;
else
else
{
{
@ -905,7 +1025,20 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
lfirst_oid ( lc ) ) ;
lfirst_oid ( lc ) ) ;
}
}
/* tables */
schemarelids = GetAllSchemaPublicationRelations ( pubform - > oid ,
PUB_OBJTYPE_TABLE ,
PUBLICATION_PART_ALL ) ;
relids = list_concat_unique_oid ( relids , schemarelids ) ;
/* sequences */
relids = list_concat_unique_oid ( relids ,
GetPublicationRelations ( pubform - > oid ,
PUB_OBJTYPE_SEQUENCE ,
PUBLICATION_PART_ALL ) ) ;
schemarelids = GetAllSchemaPublicationRelations ( pubform - > oid ,
schemarelids = GetAllSchemaPublicationRelations ( pubform - > oid ,
PUB_OBJTYPE_SEQUENCE ,
PUBLICATION_PART_ALL ) ;
PUBLICATION_PART_ALL ) ;
relids = list_concat_unique_oid ( relids , schemarelids ) ;
relids = list_concat_unique_oid ( relids , schemarelids ) ;
@ -960,7 +1093,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
if ( ! tables & & stmt - > action ! = AP_SetObjects )
if ( ! tables & & stmt - > action ! = AP_SetObjects )
return ;
return ;
rels = OpenTable List ( tables ) ;
rels = OpenRelation List ( tables , PUB_OBJTYPE_TABLE ) ;
if ( stmt - > action = = AP_AddObjects )
if ( stmt - > action = = AP_AddObjects )
{
{
@ -970,19 +1103,22 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
* Check if the relation is member of the existing schema in the
* Check if the relation is member of the existing schema in the
* publication or member of the schema list specified .
* publication or member of the schema list specified .
*/
*/
schemas = list_concat_copy ( schemaidlist , GetPublicationSchemas ( pubid ) ) ;
schemas = list_concat_copy ( schemaidlist ,
GetPublicationSchemas ( pubid ,
PUB_OBJTYPE_TABLE ) ) ;
CheckObjSchemaNotAlreadyInPublication ( rels , schemas ,
CheckObjSchemaNotAlreadyInPublication ( rels , schemas ,
PUBLICATIONOBJ_TABLE ) ;
PUBLICATIONOBJ_TABLE ) ;
TransformPubWhereClauses ( rels , queryString , pubform - > pubviaroot ) ;
TransformPubWhereClauses ( rels , queryString , pubform - > pubviaroot ) ;
PublicationAddTable s ( pubid , rels , false , stmt ) ;
PublicationAddRelation s ( pubid , rels , false , stmt ) ;
}
}
else if ( stmt - > action = = AP_DropObjects )
else if ( stmt - > action = = AP_DropObjects )
PublicationDropTable s ( pubid , rels , false ) ;
PublicationDropRelation s ( pubid , rels , false ) ;
else /* AP_SetObjects */
else /* AP_SetObjects */
{
{
List * oldrelids = GetPublicationRelations ( pubid ,
List * oldrelids = GetPublicationRelations ( pubid ,
PUB_OBJTYPE_TABLE ,
PUBLICATION_PART_ROOT ) ;
PUBLICATION_PART_ROOT ) ;
List * delrels = NIL ;
List * delrels = NIL ;
ListCell * oldlc ;
ListCell * oldlc ;
@ -1064,18 +1200,18 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
}
}
/* And drop them. */
/* And drop them. */
PublicationDropTable s ( pubid , delrels , true ) ;
PublicationDropRelation s ( pubid , delrels , true ) ;
/*
/*
* Don ' t bother calculating the difference for adding , we ' ll catch and
* Don ' t bother calculating the difference for adding , we ' ll catch and
* skip existing ones when doing catalog update .
* skip existing ones when doing catalog update .
*/
*/
PublicationAddTable s ( pubid , rels , true , stmt ) ;
PublicationAddRelation s ( pubid , rels , true , stmt ) ;
CloseTable List ( delrels ) ;
CloseRelation List ( delrels ) ;
}
}
CloseTable List ( rels ) ;
CloseRelation List ( rels ) ;
}
}
/*
/*
@ -1085,7 +1221,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
*/
*/
static void
static void
AlterPublicationSchemas ( AlterPublicationStmt * stmt ,
AlterPublicationSchemas ( AlterPublicationStmt * stmt ,
HeapTuple tup , List * schemaidlist )
HeapTuple tup , List * schemaidlist ,
char objectType )
{
{
Form_pg_publication pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
Form_pg_publication pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
@ -1107,20 +1244,20 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
List * rels ;
List * rels ;
List * reloids ;
List * reloids ;
reloids = GetPublicationRelations ( pubform - > oid , PUBLICATION_PART_ROOT ) ;
reloids = GetPublicationRelations ( pubform - > oid , objectType , PUBLICATION_PART_ROOT ) ;
rels = OpenRelIdList ( reloids ) ;
rels = OpenRelIdList ( reloids ) ;
CheckObjSchemaNotAlreadyInPublication ( rels , schemaidlist ,
CheckObjSchemaNotAlreadyInPublication ( rels , schemaidlist ,
PUBLICATIONOBJ_TABLES_IN_SCHEMA ) ;
PUBLICATIONOBJ_TABLES_IN_SCHEMA ) ;
CloseTable List ( rels ) ;
CloseRelation List ( rels ) ;
PublicationAddSchemas ( pubform - > oid , schemaidlist , false , stmt ) ;
PublicationAddSchemas ( pubform - > oid , schemaidlist , objectType , false , stmt ) ;
}
}
else if ( stmt - > action = = AP_DropObjects )
else if ( stmt - > action = = AP_DropObjects )
PublicationDropSchemas ( pubform - > oid , schemaidlist , false ) ;
PublicationDropSchemas ( pubform - > oid , schemaidlist , objectType , false ) ;
else /* AP_SetObjects */
else /* AP_SetObjects */
{
{
List * oldschemaids = GetPublicationSchemas ( pubform - > oid ) ;
List * oldschemaids = GetPublicationSchemas ( pubform - > oid , objectType ) ;
List * delschemas = NIL ;
List * delschemas = NIL ;
/* Identify which schemas should be dropped */
/* Identify which schemas should be dropped */
@ -1133,13 +1270,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
LockSchemaList ( delschemas ) ;
LockSchemaList ( delschemas ) ;
/* And drop them */
/* And drop them */
PublicationDropSchemas ( pubform - > oid , delschemas , true ) ;
PublicationDropSchemas ( pubform - > oid , delschemas , objectType , true ) ;
/*
/*
* Don ' t bother calculating the difference for adding , we ' ll catch and
* Don ' t bother calculating the difference for adding , we ' ll catch and
* skip existing ones when doing catalog update .
* skip existing ones when doing catalog update .
*/
*/
PublicationAddSchemas ( pubform - > oid , schemaidlist , true , stmt ) ;
PublicationAddSchemas ( pubform - > oid , schemaidlist , objectType , true , stmt ) ;
}
}
}
}
@ -1149,12 +1286,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
*/
*/
static void
static void
CheckAlterPublication ( AlterPublicationStmt * stmt , HeapTuple tup ,
CheckAlterPublication ( AlterPublicationStmt * stmt , HeapTuple tup ,
List * tables , List * schemaidlist )
List * tables , List * tables_schemaidlist ,
List * sequences , List * sequences_schemaidlist )
{
{
Form_pg_publication pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
Form_pg_publication pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
if ( ( stmt - > action = = AP_AddObjects | | stmt - > action = = AP_SetObjects ) & &
if ( ( stmt - > action = = AP_AddObjects | | stmt - > action = = AP_SetObjects ) & &
schemaidlist & & ! superuser ( ) )
( tables_ schemaidlist | | sequences_schemaidlist ) & & ! superuser ( ) )
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errmsg ( " must be superuser to add or set schemas " ) ) ) ;
errmsg ( " must be superuser to add or set schemas " ) ) ) ;
@ -1163,13 +1301,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
* Check that user is allowed to manipulate the publication tables in
* Check that user is allowed to manipulate the publication tables in
* schema
* schema
*/
*/
if ( schemaidlist & & pubform - > puballtables )
if ( tables_ schemaidlist & & pubform - > puballtables )
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " publication \" %s \" is defined as FOR ALL TABLES " ,
errmsg ( " publication \" %s \" is defined as FOR ALL TABLES " ,
NameStr ( pubform - > pubname ) ) ,
NameStr ( pubform - > pubname ) ) ,
errdetail ( " Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications. " ) ) ) ;
errdetail ( " Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications. " ) ) ) ;
/*
* Check that user is allowed to manipulate the publication sequences in
* schema
*/
if ( sequences_schemaidlist & & pubform - > puballsequences )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " publication \" %s \" is defined as FOR ALL SEQUENCES " ,
NameStr ( pubform - > pubname ) ) ,
errdetail ( " Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications. " ) ) ) ;
/* Check that user is allowed to manipulate the publication tables. */
/* Check that user is allowed to manipulate the publication tables. */
if ( tables & & pubform - > puballtables )
if ( tables & & pubform - > puballtables )
ereport ( ERROR ,
ereport ( ERROR ,
@ -1177,6 +1326,107 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
errmsg ( " publication \" %s \" is defined as FOR ALL TABLES " ,
errmsg ( " publication \" %s \" is defined as FOR ALL TABLES " ,
NameStr ( pubform - > pubname ) ) ,
NameStr ( pubform - > pubname ) ) ,
errdetail ( " Tables cannot be added to or dropped from FOR ALL TABLES publications. " ) ) ) ;
errdetail ( " Tables cannot be added to or dropped from FOR ALL TABLES publications. " ) ) ) ;
/* Check that user is allowed to manipulate the publication tables. */
if ( sequences & & pubform - > puballsequences )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " publication \" %s \" is defined as FOR ALL SEQUENCES " ,
NameStr ( pubform - > pubname ) ) ,
errdetail ( " Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications. " ) ) ) ;
}
/*
* Add or remove sequence to / from publication .
*/
static void
AlterPublicationSequences ( AlterPublicationStmt * stmt , HeapTuple tup ,
List * sequences , List * schemaidlist )
{
List * rels = NIL ;
Form_pg_publication pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
Oid pubid = pubform - > oid ;
/*
* It is quite possible that for the SET case user has not specified any
* tables in which case we need to remove all the existing tables .
*/
if ( ! sequences & & stmt - > action ! = AP_SetObjects )
return ;
rels = OpenRelationList ( sequences , PUB_OBJTYPE_SEQUENCE ) ;
if ( stmt - > action = = AP_AddObjects )
{
List * schemas = NIL ;
/*
* Check if the relation is member of the existing schema in the
* publication or member of the schema list specified .
*/
schemas = list_concat_copy ( schemaidlist ,
GetPublicationSchemas ( pubid ,
PUB_OBJTYPE_SEQUENCE ) ) ;
CheckObjSchemaNotAlreadyInPublication ( rels , schemas ,
PUBLICATIONOBJ_SEQUENCE ) ;
PublicationAddRelations ( pubid , rels , false , stmt ) ;
}
else if ( stmt - > action = = AP_DropObjects )
PublicationDropRelations ( pubid , rels , false ) ;
else /* DEFELEM_SET */
{
List * oldrelids = GetPublicationRelations ( pubid ,
PUB_OBJTYPE_SEQUENCE ,
PUBLICATION_PART_ROOT ) ;
List * delrels = NIL ;
ListCell * oldlc ;
CheckObjSchemaNotAlreadyInPublication ( rels , schemaidlist ,
PUBLICATIONOBJ_SEQUENCE ) ;
/* Calculate which relations to drop. */
foreach ( oldlc , oldrelids )
{
Oid oldrelid = lfirst_oid ( oldlc ) ;
ListCell * newlc ;
PublicationRelInfo * oldrel ;
bool found = false ;
foreach ( newlc , rels )
{
PublicationRelInfo * newpubrel ;
newpubrel = ( PublicationRelInfo * ) lfirst ( newlc ) ;
if ( RelationGetRelid ( newpubrel - > relation ) = = oldrelid )
{
found = true ;
break ;
}
}
/* Not yet in the list, open it and add to the list */
if ( ! found )
{
oldrel = palloc ( sizeof ( PublicationRelInfo ) ) ;
oldrel - > whereClause = NULL ;
oldrel - > relation = table_open ( oldrelid ,
ShareUpdateExclusiveLock ) ;
delrels = lappend ( delrels , oldrel ) ;
}
}
/* And drop them. */
PublicationDropRelations ( pubid , delrels , true ) ;
/*
* Don ' t bother calculating the difference for adding , we ' ll catch and
* skip existing ones when doing catalog update .
*/
PublicationAddRelations ( pubid , rels , true , stmt ) ;
CloseRelationList ( delrels ) ;
}
CloseRelationList ( rels ) ;
}
}
/*
/*
@ -1214,14 +1464,22 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
AlterPublicationOptions ( pstate , stmt , rel , tup ) ;
AlterPublicationOptions ( pstate , stmt , rel , tup ) ;
else
else
{
{
List * relations = NIL ;
List * tables = NIL ;
List * sequences = NIL ;
List * tables_schemaidlist = NIL ;
List * sequences_schemaidlist = NIL ;
List * schemaidlist = NIL ;
List * schemaidlist = NIL ;
Oid pubid = pubform - > oid ;
Oid pubid = pubform - > oid ;
ObjectsInPublicationToOids ( stmt - > pubobjects , pstate , & relations ,
ObjectsInPublicationToOids ( stmt - > pubobjects , pstate ,
& tables , & sequences ,
& tables_schemaidlist ,
& sequences_schemaidlist ,
& schemaidlist ) ;
& schemaidlist ) ;
CheckAlterPublication ( stmt , tup , relations , schemaidlist ) ;
CheckAlterPublication ( stmt , tup ,
tables , tables_schemaidlist ,
sequences , sequences_schemaidlist ) ;
heap_freetuple ( tup ) ;
heap_freetuple ( tup ) ;
@ -1249,9 +1507,16 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
errmsg ( " publication \" %s \" does not exist " ,
errmsg ( " publication \" %s \" does not exist " ,
stmt - > pubname ) ) ;
stmt - > pubname ) ) ;
AlterPublicationTables ( stmt , tup , relations , schemaidlist ,
AlterPublicationTables ( stmt , tup , tables , tables_ schemaidlist,
pstate - > p_sourcetext ) ;
pstate - > p_sourcetext ) ;
AlterPublicationSchemas ( stmt , tup , schemaidlist ) ;
AlterPublicationSequences ( stmt , tup , sequences , sequences_schemaidlist ) ;
AlterPublicationSchemas ( stmt , tup , tables_schemaidlist ,
PUB_OBJTYPE_TABLE ) ;
AlterPublicationSchemas ( stmt , tup , sequences_schemaidlist ,
PUB_OBJTYPE_SEQUENCE ) ;
}
}
/* Cleanup. */
/* Cleanup. */
@ -1319,7 +1584,7 @@ RemovePublicationById(Oid pubid)
pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
pubform = ( Form_pg_publication ) GETSTRUCT ( tup ) ;
/* Invalidate relcache so that publication info is rebuilt. */
/* Invalidate relcache so that publication info is rebuilt. */
if ( pubform - > puballtables )
if ( pubform - > puballtables | | pubform - > puballsequences )
CacheInvalidateRelcacheAll ( ) ;
CacheInvalidateRelcacheAll ( ) ;
CatalogTupleDelete ( rel , & tup - > t_self ) ;
CatalogTupleDelete ( rel , & tup - > t_self ) ;
@ -1355,6 +1620,7 @@ RemovePublicationSchemaById(Oid psoid)
* partitions .
* partitions .
*/
*/
schemaRels = GetSchemaPublicationRelations ( pubsch - > pnnspid ,
schemaRels = GetSchemaPublicationRelations ( pubsch - > pnnspid ,
pubsch - > pntype ,
PUBLICATION_PART_ALL ) ;
PUBLICATION_PART_ALL ) ;
InvalidatePublicationRels ( schemaRels ) ;
InvalidatePublicationRels ( schemaRels ) ;
@ -1397,29 +1663,45 @@ OpenRelIdList(List *relids)
* add them to a publication .
* add them to a publication .
*/
*/
static List *
static List *
OpenTable List ( List * tables )
OpenRelation List ( List * rels , char objectType )
{
{
List * relids = NIL ;
List * relids = NIL ;
List * rel s = NIL ;
List * result = NIL ;
ListCell * lc ;
ListCell * lc ;
List * relids_with_rf = NIL ;
List * relids_with_rf = NIL ;
/*
/*
* Open , share - lock , and check all the explicitly - specified relations
* Open , share - lock , and check all the explicitly - specified relations
*/
*/
foreach ( lc , table s)
foreach ( lc , rel s)
{
{
PublicationTable * t = lfirst_node ( PublicationTable , lc ) ;
PublicationTable * t = lfirst_node ( PublicationTable , lc ) ;
bool recurse = t - > relation - > inh ;
bool recurse = t - > relation - > inh ;
Relation rel ;
Relation rel ;
Oid myrelid ;
Oid myrelid ;
PublicationRelInfo * pub_rel ;
PublicationRelInfo * pub_rel ;
char myrelkind ;
/* Allow query cancel in case this takes a long time */
/* Allow query cancel in case this takes a long time */
CHECK_FOR_INTERRUPTS ( ) ;
CHECK_FOR_INTERRUPTS ( ) ;
rel = table_openrv ( t - > relation , ShareUpdateExclusiveLock ) ;
rel = table_openrv ( t - > relation , ShareUpdateExclusiveLock ) ;
myrelid = RelationGetRelid ( rel ) ;
myrelid = RelationGetRelid ( rel ) ;
myrelkind = get_rel_relkind ( myrelid ) ;
/*
* Make sure the relkind matches the expected object type . This may
* happen e . g . when adding a sequence using ADD TABLE or a table
* using ADD SEQUENCE ) .
*
* XXX We let through unsupported object types ( views etc . ) . Those
* will be caught later in check_publication_add_relation .
*/
if ( pub_get_object_type_for_relkind ( myrelkind ) ! = PUB_OBJTYPE_UNSUPPORTED & &
pub_get_object_type_for_relkind ( myrelkind ) ! = objectType )
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " object type does not match type expected by command " ) ) ;
/*
/*
* Filter out duplicates if user specifies " foo, foo " .
* Filter out duplicates if user specifies " foo, foo " .
@ -1444,7 +1726,7 @@ OpenTableList(List *tables)
pub_rel = palloc ( sizeof ( PublicationRelInfo ) ) ;
pub_rel = palloc ( sizeof ( PublicationRelInfo ) ) ;
pub_rel - > relation = rel ;
pub_rel - > relation = rel ;
pub_rel - > whereClause = t - > whereClause ;
pub_rel - > whereClause = t - > whereClause ;
rel s = lappend ( rel s , pub_rel ) ;
result = lappend ( result , pub_rel ) ;
relids = lappend_oid ( relids , myrelid ) ;
relids = lappend_oid ( relids , myrelid ) ;
if ( t - > whereClause )
if ( t - > whereClause )
@ -1498,7 +1780,7 @@ OpenTableList(List *tables)
pub_rel - > relation = rel ;
pub_rel - > relation = rel ;
/* child inherits WHERE clause from parent */
/* child inherits WHERE clause from parent */
pub_rel - > whereClause = t - > whereClause ;
pub_rel - > whereClause = t - > whereClause ;
rel s = lappend ( rel s , pub_rel ) ;
result = lappend ( result , pub_rel ) ;
relids = lappend_oid ( relids , childrelid ) ;
relids = lappend_oid ( relids , childrelid ) ;
if ( t - > whereClause )
if ( t - > whereClause )
@ -1510,14 +1792,14 @@ OpenTableList(List *tables)
list_free ( relids ) ;
list_free ( relids ) ;
list_free ( relids_with_rf ) ;
list_free ( relids_with_rf ) ;
return rel s ;
return result ;
}
}
/*
/*
* Close all relations in the list .
* Close all relations in the list .
*/
*/
static void
static void
CloseTable List ( List * rels )
CloseRelation List ( List * rels )
{
{
ListCell * lc ;
ListCell * lc ;
@ -1565,12 +1847,12 @@ LockSchemaList(List *schemalist)
* Add listed tables to the publication .
* Add listed tables to the publication .
*/
*/
static void
static void
PublicationAddTable s ( Oid pubid , List * rels , bool if_not_exists ,
PublicationAddRelation s ( Oid pubid , List * rels , bool if_not_exists ,
AlterPublicationStmt * stmt )
AlterPublicationStmt * stmt )
{
{
ListCell * lc ;
ListCell * lc ;
Assert ( ! stmt | | ! stmt - > for_all_table s ) ;
Assert ( ! stmt | | ! stmt - > for_all_object s ) ;
foreach ( lc , rels )
foreach ( lc , rels )
{
{
@ -1599,7 +1881,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
* Remove listed tables from the publication .
* Remove listed tables from the publication .
*/
*/
static void
static void
PublicationDropTable s ( Oid pubid , List * rels , bool missing_ok )
PublicationDropRelation s ( Oid pubid , List * rels , bool missing_ok )
{
{
ObjectAddress obj ;
ObjectAddress obj ;
ListCell * lc ;
ListCell * lc ;
@ -1639,19 +1921,19 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
* Add listed schemas to the publication .
* Add listed schemas to the publication .
*/
*/
static void
static void
PublicationAddSchemas ( Oid pubid , List * schemas , bool if_not_exists ,
PublicationAddSchemas ( Oid pubid , List * schemas , char objectType ,
AlterPublicationStmt * stmt )
bool if_not_exists , AlterPublicationStmt * stmt )
{
{
ListCell * lc ;
ListCell * lc ;
Assert ( ! stmt | | ! stmt - > for_all_table s ) ;
Assert ( ! stmt | | ! stmt - > for_all_object s ) ;
foreach ( lc , schemas )
foreach ( lc , schemas )
{
{
Oid schemaid = lfirst_oid ( lc ) ;
Oid schemaid = lfirst_oid ( lc ) ;
ObjectAddress obj ;
ObjectAddress obj ;
obj = publication_add_schema ( pubid , schemaid , if_not_exists ) ;
obj = publication_add_schema ( pubid , schemaid , objectType , if_not_exists ) ;
if ( stmt )
if ( stmt )
{
{
EventTriggerCollectSimpleCommand ( obj , InvalidObjectAddress ,
EventTriggerCollectSimpleCommand ( obj , InvalidObjectAddress ,
@ -1667,7 +1949,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
* Remove listed schemas from the publication .
* Remove listed schemas from the publication .
*/
*/
static void
static void
PublicationDropSchemas ( Oid pubid , List * schemas , bool missing_ok )
PublicationDropSchemas ( Oid pubid , List * schemas , char objectType , bool missing_ok )
{
{
ObjectAddress obj ;
ObjectAddress obj ;
ListCell * lc ;
ListCell * lc ;
@ -1677,10 +1959,11 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
{
{
Oid schemaid = lfirst_oid ( lc ) ;
Oid schemaid = lfirst_oid ( lc ) ;
psid = GetSysCacheOid2 ( PUBLICATIONNAMESPACEMAP ,
psid = GetSysCacheOid3 ( PUBLICATIONNAMESPACEMAP ,
Anum_pg_publication_namespace_oid ,
Anum_pg_publication_namespace_oid ,
ObjectIdGetDatum ( schemaid ) ,
ObjectIdGetDatum ( schemaid ) ,
ObjectIdGetDatum ( pubid ) ) ;
ObjectIdGetDatum ( pubid ) ,
CharGetDatum ( objectType ) ) ;
if ( ! OidIsValid ( psid ) )
if ( ! OidIsValid ( psid ) )
{
{
if ( missing_ok )
if ( missing_ok )
@ -1735,6 +2018,13 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
NameStr ( form - > pubname ) ) ,
NameStr ( form - > pubname ) ) ,
errhint ( " The owner of a FOR ALL TABLES publication must be a superuser. " ) ) ) ;
errhint ( " The owner of a FOR ALL TABLES publication must be a superuser. " ) ) ) ;
if ( form - > puballsequences & & ! superuser_arg ( newOwnerId ) )
ereport ( ERROR ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
errmsg ( " permission denied to change owner of publication \" %s \" " ,
NameStr ( form - > pubname ) ) ,
errhint ( " The owner of a FOR ALL SEQUENCES publication must be a superuser. " ) ) ) ;
if ( ! superuser_arg ( newOwnerId ) & & is_schema_publication ( form - > oid ) )
if ( ! superuser_arg ( newOwnerId ) & & is_schema_publication ( form - > oid ) )
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,
( errcode ( ERRCODE_INSUFFICIENT_PRIVILEGE ) ,