@ -16,6 +16,7 @@
# include "utils/builtins.h"
# include "utils/lsyscache.h"
# include "catalog/pg_class.h"
# include "catalog/pg_database.h"
# include "commands/defrem.h"
# include "commands/sequence.h"
# include "access/table.h"
@ -24,8 +25,13 @@
# include "catalog/namespace.h"
# include "commands/event_trigger.h"
# include "common/pg_tde_utils.h"
# include "storage/lmgr.h"
# include "tcop/utility.h"
# include "utils/fmgroids.h"
# include "utils/syscache.h"
# include "pg_tde_event_capture.h"
# include "pg_tde_guc.h"
# include "access/pg_tde_tdemap.h"
# include "catalog/tde_principal_key.h"
# include "miscadmin.h"
# include "access/tableam.h"
@ -34,6 +40,7 @@
/* Global variable that gets set at ddl start and cleard out at ddl end*/
static TdeCreateEvent tdeCurrentCreateEvent = { . tid = { . value = 0 } } ;
static Oid get_db_oid ( const char * name ) ;
static void reset_current_tde_create_event ( void ) ;
static Oid get_tde_table_am_oid ( void ) ;
@ -354,3 +361,160 @@ get_tde_table_am_oid(void)
{
return get_table_am_oid ( " tde_heap " , false ) ;
}
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL ;
/*
* Handles utility commands which we cannot handle in the event trigger .
*/
static void
pg_tde_proccess_utility ( PlannedStmt * pstmt ,
const char * queryString ,
bool readOnlyTree ,
ProcessUtilityContext context ,
ParamListInfo params ,
QueryEnvironment * queryEnv ,
DestReceiver * dest ,
QueryCompletion * qc )
{
Node * parsetree = pstmt - > utilityStmt ;
switch ( nodeTag ( parsetree ) )
{
case T_CreatedbStmt :
{
CreatedbStmt * stmt = castNode ( CreatedbStmt , parsetree ) ;
ListCell * option ;
char * dbtemplate = " template1 " ;
char * strategy = " wal_log " ;
foreach ( option , stmt - > options )
{
DefElem * defel = ( DefElem * ) lfirst ( option ) ;
if ( strcmp ( defel - > defname , " template " ) = = 0 )
dbtemplate = defGetString ( defel ) ;
else if ( strcmp ( defel - > defname , " strategy " ) = = 0 )
strategy = defGetString ( defel ) ;
}
if ( pg_strcasecmp ( strategy , " file_copy " ) = = 0 )
{
Oid dbOid = get_db_oid ( dbtemplate ) ;
if ( dbOid ! = InvalidOid )
{
int count = pg_tde_count_relations ( dbOid ) ;
if ( count > 0 )
ereport ( ERROR ,
errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " The FILE_COPY strategy cannot be used when there are encrypted objects in the template database: %d objects found " , count ) ,
errhint ( " Use the WAL_LOG strategy instead. " ) ) ;
}
}
}
break ;
default :
break ;
}
if ( next_ProcessUtility_hook )
( * next_ProcessUtility_hook ) ( pstmt , queryString , readOnlyTree ,
context , params , queryEnv ,
dest , qc ) ;
else
standard_ProcessUtility ( pstmt , queryString , readOnlyTree ,
context , params , queryEnv ,
dest , qc ) ;
}
void
TdeEventCaptureInit ( void )
{
next_ProcessUtility_hook = ProcessUtility_hook ;
ProcessUtility_hook = pg_tde_proccess_utility ;
}
/*
* A stripped down version of get_db_info ( ) from src / backend / commands / dbcommands . c
*/
static Oid
get_db_oid ( const char * name )
{
Oid resDbOid = InvalidOid ;
Relation relation ;
Assert ( name ) ;
relation = table_open ( DatabaseRelationId , AccessShareLock ) ;
/*
* Loop covers the rare case where the database is renamed before we can
* lock it . We try again just in case we can find a new one of the same
* name .
*/
for ( ; ; )
{
ScanKeyData scanKey ;
SysScanDesc scan ;
HeapTuple tuple ;
Oid dbOid ;
/*
* there ' s no syscache for database - indexed - by - name , so must do it the
* hard way
*/
ScanKeyInit ( & scanKey ,
Anum_pg_database_datname ,
BTEqualStrategyNumber , F_NAMEEQ ,
CStringGetDatum ( name ) ) ;
scan = systable_beginscan ( relation , DatabaseNameIndexId , true ,
NULL , 1 , & scanKey ) ;
tuple = systable_getnext ( scan ) ;
if ( ! HeapTupleIsValid ( tuple ) )
{
/* definitely no database of that name */
systable_endscan ( scan ) ;
break ;
}
dbOid = ( ( Form_pg_database ) GETSTRUCT ( tuple ) ) - > oid ;
systable_endscan ( scan ) ;
/*
* Now that we have a database OID , we can try to lock the DB .
*/
LockSharedObject ( DatabaseRelationId , dbOid , 0 , AccessExclusiveLock ) ;
/*
* And now , re - fetch the tuple by OID . If it ' s still there and still
* the same name , we win ; else , drop the lock and loop back to try
* again .
*/
tuple = SearchSysCache1 ( DATABASEOID , ObjectIdGetDatum ( dbOid ) ) ;
if ( HeapTupleIsValid ( tuple ) )
{
Form_pg_database dbform = ( Form_pg_database ) GETSTRUCT ( tuple ) ;
if ( strcmp ( name , NameStr ( dbform - > datname ) ) = = 0 )
{
resDbOid = dbOid ;
ReleaseSysCache ( tuple ) ;
break ;
}
/* can only get here if it was just renamed */
ReleaseSysCache ( tuple ) ;
}
UnlockSharedObject ( DatabaseRelationId , dbOid , 0 , AccessExclusiveLock ) ;
}
table_close ( relation , AccessShareLock ) ;
return resDbOid ;
}