@ -63,13 +63,31 @@
# include "utils/builtins.h"
# include "utils/fmgroids.h"
# include "utils/pg_locale.h"
# include "utils/relmapper.h"
# include "utils/snapmgr.h"
# include "utils/syscache.h"
/*
* Create database strategy .
*
* CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
* copied block .
*
* CREATEDB_FILE_COPY will simply perform a file system level copy of the
* database and log a single record for each tablespace copied . To make this
* safe , it also triggers checkpoints before and after the operation .
*/
typedef enum CreateDBStrategy
{
CREATEDB_WAL_LOG ,
CREATEDB_FILE_COPY
} CreateDBStrategy ;
typedef struct
{
Oid src_dboid ; /* source (template) DB */
Oid dest_dboid ; /* DB we are trying to create */
CreateDBStrategy strategy ; /* create db strategy */
} createdb_failure_params ;
typedef struct
@ -78,6 +96,17 @@ typedef struct
Oid dest_tsoid ; /* tablespace we are trying to move to */
} movedb_failure_params ;
/*
* Information about a relation to be copied when creating a database .
*/
typedef struct CreateDBRelInfo
{
RelFileNode rnode ; /* physical relation identifier */
Oid reloid ; /* relation oid */
bool permanent ; /* relation is permanent or unlogged */
} CreateDBRelInfo ;
/* non-export function prototypes */
static void createdb_failure_callback ( int code , Datum arg ) ;
static void movedb ( const char * dbname , const char * tblspcname ) ;
@ -93,7 +122,546 @@ static bool have_createdb_privilege(void);
static void remove_dbtablespaces ( Oid db_id ) ;
static bool check_db_file_conflict ( Oid db_id ) ;
static int errdetail_busy_db ( int notherbackends , int npreparedxacts ) ;
static void CreateDatabaseUsingWalLog ( Oid src_dboid , Oid dboid , Oid src_tsid ,
Oid dst_tsid ) ;
static List * ScanSourceDatabasePgClass ( Oid srctbid , Oid srcdbid , char * srcpath ) ;
static List * ScanSourceDatabasePgClassPage ( Page page , Buffer buf , Oid tbid ,
Oid dbid , char * srcpath ,
List * rnodelist , Snapshot snapshot ) ;
static CreateDBRelInfo * ScanSourceDatabasePgClassTuple ( HeapTupleData * tuple ,
Oid tbid , Oid dbid ,
char * srcpath ) ;
static void CreateDirAndVersionFile ( char * dbpath , Oid dbid , Oid tsid ,
bool isRedo ) ;
static void CreateDatabaseUsingFileCopy ( Oid src_dboid , Oid dboid , Oid src_tsid ,
Oid dst_tsid ) ;
/*
* Create a new database using the WAL_LOG strategy .
*
* Each copied block is separately written to the write - ahead log .
*/
static void
CreateDatabaseUsingWalLog ( Oid src_dboid , Oid dst_dboid ,
Oid src_tsid , Oid dst_tsid )
{
char * srcpath ;
char * dstpath ;
List * rnodelist = NULL ;
ListCell * cell ;
LockRelId srcrelid ;
LockRelId dstrelid ;
RelFileNode srcrnode ;
RelFileNode dstrnode ;
CreateDBRelInfo * relinfo ;
/* Get source and destination database paths. */
srcpath = GetDatabasePath ( src_dboid , src_tsid ) ;
dstpath = GetDatabasePath ( dst_dboid , dst_tsid ) ;
/* Create database directory and write PG_VERSION file. */
CreateDirAndVersionFile ( dstpath , dst_dboid , dst_tsid , false ) ;
/* Copy relmap file from source database to the destination database. */
RelationMapCopy ( dst_dboid , dst_tsid , srcpath , dstpath ) ;
/* Get list of relfilenodes to copy from the source database. */
rnodelist = ScanSourceDatabasePgClass ( src_tsid , src_dboid , srcpath ) ;
Assert ( rnodelist ! = NIL ) ;
/*
* Database IDs will be the same for all relations so set them before
* entering the loop .
*/
srcrelid . dbId = src_dboid ;
dstrelid . dbId = dst_dboid ;
/* Loop over our list of relfilenodes and copy each one. */
foreach ( cell , rnodelist )
{
relinfo = lfirst ( cell ) ;
srcrnode = relinfo - > rnode ;
/*
* If the relation is from the source db ' s default tablespace then we
* need to create it in the destinations db ' s default tablespace .
* Otherwise , we need to create in the same tablespace as it is in the
* source database .
*/
if ( srcrnode . spcNode = = src_tsid )
dstrnode . spcNode = dst_tsid ;
else
dstrnode . spcNode = srcrnode . spcNode ;
dstrnode . dbNode = dst_dboid ;
dstrnode . relNode = srcrnode . relNode ;
/*
* Acquire locks on source and target relations before copying .
*
* We typically do not read relation data into shared_buffers without
* holding a relation lock . It ' s unclear what could go wrong if we
* skipped it in this case , because nobody can be modifying either
* the source or destination database at this point , and we have locks
* on both databases , too , but let ' s take the conservative route .
*/
dstrelid . relId = srcrelid . relId = relinfo - > reloid ;
LockRelationId ( & srcrelid , AccessShareLock ) ;
LockRelationId ( & dstrelid , AccessShareLock ) ;
/* Copy relation storage from source to the destination. */
CreateAndCopyRelationData ( srcrnode , dstrnode , relinfo - > permanent ) ;
/* Release the relation locks. */
UnlockRelationId ( & srcrelid , AccessShareLock ) ;
UnlockRelationId ( & dstrelid , AccessShareLock ) ;
}
list_free_deep ( rnodelist ) ;
}
/*
* Scan the pg_class table in the source database to identify the relations
* that need to be copied to the destination database .
*
* This is an exception to the usual rule that cross - database access is
* not possible . We can make it work here because we know that there are no
* connections to the source database and ( since there can ' t be prepared
* transactions touching that database ) no in - doubt tuples either . This
* means that we don ' t need to worry about pruning removing anything from
* under us , and we don ' t need to be too picky about our snapshot either .
* As long as it sees all previously - committed XIDs as committed and all
* aborted XIDs as aborted , we should be fine : nothing else is possible
* here .
*
* We can ' t rely on the relcache for anything here , because that only knows
* about the database to which we are connected , and can ' t handle access to
* other databases . That also means we can ' t rely on the heap scan
* infrastructure , which would be a bad idea anyway since it might try
* to do things like HOT pruning which we definitely can ' t do safely in
* a database to which we ' re not even connected .
*/
static List *
ScanSourceDatabasePgClass ( Oid tbid , Oid dbid , char * srcpath )
{
RelFileNode rnode ;
BlockNumber nblocks ;
BlockNumber blkno ;
Buffer buf ;
Oid relfilenode ;
Page page ;
List * rnodelist = NIL ;
LockRelId relid ;
Relation rel ;
Snapshot snapshot ;
BufferAccessStrategy bstrategy ;
/* Get pg_class relfilenode. */
relfilenode = RelationMapOidToFilenodeForDatabase ( srcpath ,
RelationRelationId ) ;
/* Don't read data into shared_buffers without holding a relation lock. */
relid . dbId = dbid ;
relid . relId = RelationRelationId ;
LockRelationId ( & relid , AccessShareLock ) ;
/* Prepare a RelFileNode for the pg_class relation. */
rnode . spcNode = tbid ;
rnode . dbNode = dbid ;
rnode . relNode = relfilenode ;
/*
* We can ' t use a real relcache entry for a relation in some other
* database , but since we ' re only going to access the fields related
* to physical storage , a fake one is good enough . If we didn ' t do this
* and used the smgr layer directly , we would have to worry about
* invalidations .
*/
rel = CreateFakeRelcacheEntry ( rnode ) ;
nblocks = smgrnblocks ( RelationGetSmgr ( rel ) , MAIN_FORKNUM ) ;
FreeFakeRelcacheEntry ( rel ) ;
/* Use a buffer access strategy since this is a bulk read operation. */
bstrategy = GetAccessStrategy ( BAS_BULKREAD ) ;
/*
* As explained in the function header comments , we need a snapshot that
* will see all committed transactions as committed , and our transaction
* snapshot - or the active snapshot - might not be new enough for that ,
* but the return value of GetLatestSnapshot ( ) should work fine .
*/
snapshot = GetLatestSnapshot ( ) ;
/* Process the relation block by block. */
for ( blkno = 0 ; blkno < nblocks ; blkno + + )
{
CHECK_FOR_INTERRUPTS ( ) ;
buf = ReadBufferWithoutRelcache ( rnode , MAIN_FORKNUM , blkno ,
RBM_NORMAL , bstrategy , false ) ;
LockBuffer ( buf , BUFFER_LOCK_SHARE ) ;
page = BufferGetPage ( buf ) ;
if ( PageIsNew ( page ) | | PageIsEmpty ( page ) )
{
UnlockReleaseBuffer ( buf ) ;
continue ;
}
/* Append relevant pg_class tuples for current page to rnodelist. */
rnodelist = ScanSourceDatabasePgClassPage ( page , buf , tbid , dbid ,
srcpath , rnodelist ,
snapshot ) ;
UnlockReleaseBuffer ( buf ) ;
}
/* Release relation lock. */
UnlockRelationId ( & relid , AccessShareLock ) ;
return rnodelist ;
}
/*
* Scan one page of the source database ' s pg_class relation and add relevant
* entries to rnodelist . The return value is the updated list .
*/
static List *
ScanSourceDatabasePgClassPage ( Page page , Buffer buf , Oid tbid , Oid dbid ,
char * srcpath , List * rnodelist ,
Snapshot snapshot )
{
BlockNumber blkno = BufferGetBlockNumber ( buf ) ;
OffsetNumber offnum ;
OffsetNumber maxoff ;
HeapTupleData tuple ;
maxoff = PageGetMaxOffsetNumber ( page ) ;
/* Loop over offsets. */
for ( offnum = FirstOffsetNumber ;
offnum < = maxoff ;
offnum = OffsetNumberNext ( offnum ) )
{
ItemId itemid ;
itemid = PageGetItemId ( page , offnum ) ;
/* Nothing to do if slot is empty or already dead. */
if ( ! ItemIdIsUsed ( itemid ) | | ItemIdIsDead ( itemid ) | |
ItemIdIsRedirected ( itemid ) )
continue ;
Assert ( ItemIdIsNormal ( itemid ) ) ;
ItemPointerSet ( & ( tuple . t_self ) , blkno , offnum ) ;
/* Initialize a HeapTupleData structure. */
tuple . t_data = ( HeapTupleHeader ) PageGetItem ( page , itemid ) ;
tuple . t_len = ItemIdGetLength ( itemid ) ;
tuple . t_tableOid = RelationRelationId ;
/* Skip tuples that are not visible to this snapshot. */
if ( HeapTupleSatisfiesVisibility ( & tuple , snapshot , buf ) )
{
CreateDBRelInfo * relinfo ;
/*
* ScanSourceDatabasePgClassTuple is in charge of constructing
* a CreateDBRelInfo object for this tuple , but can also decide
* that this tuple isn ' t something we need to copy . If we do need
* to copy the relation , add it to the list .
*/
relinfo = ScanSourceDatabasePgClassTuple ( & tuple , tbid , dbid ,
srcpath ) ;
if ( relinfo ! = NULL )
rnodelist = lappend ( rnodelist , relinfo ) ;
}
}
return rnodelist ;
}
/*
* Decide whether a certain pg_class tuple represents something that
* needs to be copied from the source database to the destination database ,
* and if so , construct a CreateDBRelInfo for it .
*
* Visbility checks are handled by the caller , so our job here is just
* to assess the data stored in the tuple .
*/
CreateDBRelInfo *
ScanSourceDatabasePgClassTuple ( HeapTupleData * tuple , Oid tbid , Oid dbid ,
char * srcpath )
{
CreateDBRelInfo * relinfo ;
Form_pg_class classForm ;
Oid relfilenode = InvalidOid ;
classForm = ( Form_pg_class ) GETSTRUCT ( tuple ) ;
/*
* Return NULL if this object does not need to be copied .
*
* Shared objects don ' t need to be copied , because they are shared .
* Objects without storage can ' t be copied , because there ' s nothing to
* copy . Temporary relations don ' t need to be copied either , because
* they are inaccessible outside of the session that created them ,
* which must be gone already , and couldn ' t connect to a different database
* if it still existed . autovacuum will eventually remove the pg_class
* entries as well .
*/
if ( classForm - > reltablespace = = GLOBALTABLESPACE_OID | |
! RELKIND_HAS_STORAGE ( classForm - > relkind ) | |
classForm - > relpersistence = = RELPERSISTENCE_TEMP )
return NULL ;
/*
* If relfilenode is valid then directly use it . Otherwise , consult the
* relmap .
*/
if ( OidIsValid ( classForm - > relfilenode ) )
relfilenode = classForm - > relfilenode ;
else
relfilenode = RelationMapOidToFilenodeForDatabase ( srcpath ,
classForm - > oid ) ;
/* We must have a valid relfilenode oid. */
if ( ! OidIsValid ( relfilenode ) )
elog ( ERROR , " relation with OID %u does not have a valid relfilenode " ,
classForm - > oid ) ;
/* Prepare a rel info element and add it to the list. */
relinfo = ( CreateDBRelInfo * ) palloc ( sizeof ( CreateDBRelInfo ) ) ;
if ( OidIsValid ( classForm - > reltablespace ) )
relinfo - > rnode . spcNode = classForm - > reltablespace ;
else
relinfo - > rnode . spcNode = tbid ;
relinfo - > rnode . dbNode = dbid ;
relinfo - > rnode . relNode = relfilenode ;
relinfo - > reloid = classForm - > oid ;
/* Temporary relations were rejected above. */
Assert ( classForm - > relpersistence ! = RELPERSISTENCE_TEMP ) ;
relinfo - > permanent =
( classForm - > relpersistence = = RELPERSISTENCE_PERMANENT ) ? true : false ;
return relinfo ;
}
/*
* Create database directory and write out the PG_VERSION file in the database
* path . If isRedo is true , it ' s okay for the database directory to exist
* already .
*/
static void
CreateDirAndVersionFile ( char * dbpath , Oid dbid , Oid tsid , bool isRedo )
{
int fd ;
int nbytes ;
char versionfile [ MAXPGPATH ] ;
char buf [ 16 ] ;
/*
* Prepare version data before starting a critical section .
*
* Note that we don ' t have to copy this from the source database ; there ' s
* only one legal value .
*/
sprintf ( buf , " %s \n " , PG_MAJORVERSION ) ;
nbytes = strlen ( PG_MAJORVERSION ) + 1 ;
/* If we are not in WAL replay then write the WAL. */
if ( ! isRedo )
{
xl_dbase_create_wal_log_rec xlrec ;
XLogRecPtr lsn ;
START_CRIT_SECTION ( ) ;
xlrec . db_id = dbid ;
xlrec . tablespace_id = tsid ;
XLogBeginInsert ( ) ;
XLogRegisterData ( ( char * ) ( & xlrec ) ,
sizeof ( xl_dbase_create_wal_log_rec ) ) ;
lsn = XLogInsert ( RM_DBASE_ID , XLOG_DBASE_CREATE_WAL_LOG ) ;
/* As always, WAL must hit the disk before the data update does. */
XLogFlush ( lsn ) ;
}
/* Create database directory. */
if ( MakePGDirectory ( dbpath ) < 0 )
{
/* Failure other than already exists or not in WAL replay? */
if ( errno ! = EEXIST | | ! isRedo )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not create directory \" %s \" : %m " , dbpath ) ) ) ;
}
/*
* Create PG_VERSION file in the database path . If the file already
* exists and we are in WAL replay then try again to open it in write
* mode .
*/
snprintf ( versionfile , sizeof ( versionfile ) , " %s/%s " , dbpath , " PG_VERSION " ) ;
fd = OpenTransientFile ( versionfile , O_WRONLY | O_CREAT | O_EXCL | PG_BINARY ) ;
if ( fd < 0 & & errno = = EEXIST & & isRedo )
fd = OpenTransientFile ( versionfile , O_WRONLY | O_TRUNC | PG_BINARY ) ;
if ( fd < 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not create file \" %s \" : %m " , versionfile ) ) ) ;
/* Write PG_MAJORVERSION in the PG_VERSION file. */
pgstat_report_wait_start ( WAIT_EVENT_VERSION_FILE_WRITE ) ;
errno = 0 ;
if ( ( int ) write ( fd , buf , nbytes ) ! = nbytes )
{
/* If write didn't set errno, assume problem is no disk space. */
if ( errno = = 0 )
errno = ENOSPC ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not write to file \" %s \" : %m " , versionfile ) ) ) ;
}
pgstat_report_wait_end ( ) ;
/* Close the version file. */
CloseTransientFile ( fd ) ;
/* Critical section done. */
if ( ! isRedo )
END_CRIT_SECTION ( ) ;
}
/*
* Create a new database using the FILE_COPY strategy .
*
* Copy each tablespace at the filesystem level , and log a single WAL record
* for each tablespace copied . This requires a checkpoint before and after the
* copy , which may be expensive , but it does greatly reduce WAL generation
* if the copied database is large .
*/
static void
CreateDatabaseUsingFileCopy ( Oid src_dboid , Oid dst_dboid , Oid src_tsid ,
Oid dst_tsid )
{
TableScanDesc scan ;
Relation rel ;
HeapTuple tuple ;
/*
* Force a checkpoint before starting the copy . This will force all dirty
* buffers , including those of unlogged tables , out to disk , to ensure
* source database is up - to - date on disk for the copy .
* FlushDatabaseBuffers ( ) would suffice for that , but we also want to
* process any pending unlink requests . Otherwise , if a checkpoint
* happened while we ' re copying files , a file might be deleted just when
* we ' re about to copy it , causing the lstat ( ) call in copydir ( ) to fail
* with ENOENT .
*/
RequestCheckpoint ( CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL ) ;
/*
* Iterate through all tablespaces of the template database , and copy each
* one to the new database .
*/
rel = table_open ( TableSpaceRelationId , AccessShareLock ) ;
scan = table_beginscan_catalog ( rel , 0 , NULL ) ;
while ( ( tuple = heap_getnext ( scan , ForwardScanDirection ) ) ! = NULL )
{
Form_pg_tablespace spaceform = ( Form_pg_tablespace ) GETSTRUCT ( tuple ) ;
Oid srctablespace = spaceform - > oid ;
Oid dsttablespace ;
char * srcpath ;
char * dstpath ;
struct stat st ;
/* No need to copy global tablespace */
if ( srctablespace = = GLOBALTABLESPACE_OID )
continue ;
srcpath = GetDatabasePath ( src_dboid , srctablespace ) ;
if ( stat ( srcpath , & st ) < 0 | | ! S_ISDIR ( st . st_mode ) | |
directory_is_empty ( srcpath ) )
{
/* Assume we can ignore it */
pfree ( srcpath ) ;
continue ;
}
if ( srctablespace = = src_tsid )
dsttablespace = dst_tsid ;
else
dsttablespace = srctablespace ;
dstpath = GetDatabasePath ( dst_dboid , dsttablespace ) ;
/*
* Copy this subdirectory to the new location
*
* We don ' t need to copy subdirectories
*/
copydir ( srcpath , dstpath , false ) ;
/* Record the filesystem change in XLOG */
{
xl_dbase_create_file_copy_rec xlrec ;
xlrec . db_id = dst_dboid ;
xlrec . tablespace_id = dsttablespace ;
xlrec . src_db_id = src_dboid ;
xlrec . src_tablespace_id = srctablespace ;
XLogBeginInsert ( ) ;
XLogRegisterData ( ( char * ) & xlrec ,
sizeof ( xl_dbase_create_file_copy_rec ) ) ;
( void ) XLogInsert ( RM_DBASE_ID ,
XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE ) ;
}
}
table_endscan ( scan ) ;
table_close ( rel , AccessShareLock ) ;
/*
* We force a checkpoint before committing . This effectively means that
* committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
* replayed ( at least not in ordinary crash recovery ; we still have to
* make the XLOG entry for the benefit of PITR operations ) . This avoids
* two nasty scenarios :
*
* # 1 : When PITR is off , we don ' t XLOG the contents of newly created
* indexes ; therefore the drop - and - recreate - whole - directory behavior of
* DBASE_CREATE replay would lose such indexes .
*
* # 2 : Since we have to recopy the source database during DBASE_CREATE
* replay , we run the risk of copying changes in it that were committed
* after the original CREATE DATABASE command but before the system crash
* that led to the replay . This is at least unexpected and at worst could
* lead to inconsistencies , eg duplicate table names .
*
* ( Both of these were real bugs in releases 8.0 through 8.0 .3 . )
*
* In PITR replay , the first of these isn ' t an issue , and the second is
* only a risk if the CREATE DATABASE and subsequent template database
* change both occur while a base backup is being taken . There doesn ' t
* seem to be much we can do about that except document it as a
* limitation .
*
* See CreateDatabaseUsingWalLog ( ) for a less cheesy CREATE DATABASE
* strategy that avoids these problems .
*/
RequestCheckpoint ( CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT ) ;
}
/*
* CREATE DATABASE
@ -101,8 +669,6 @@ static int errdetail_busy_db(int notherbackends, int npreparedxacts);
Oid
createdb ( ParseState * pstate , const CreatedbStmt * stmt )
{
TableScanDesc scan ;
Relation rel ;
Oid src_dboid ;
Oid src_owner ;
int src_encoding = - 1 ;
@ -137,6 +703,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
DefElem * dallowconnections = NULL ;
DefElem * dconnlimit = NULL ;
DefElem * dcollversion = NULL ;
DefElem * dstrategy = NULL ;
char * dbname = stmt - > dbname ;
char * dbowner = NULL ;
const char * dbtemplate = NULL ;
@ -152,6 +719,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
char * dbcollversion = NULL ;
int notherbackends ;
int npreparedxacts ;
CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG ;
createdb_failure_params fparms ;
/* Extract options from the statement node tree */
@ -269,6 +837,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ) ,
errmsg ( " OIDs less than %u are reserved for system objects " , FirstNormalObjectId ) ) ;
}
else if ( strcmp ( defel - > defname , " strategy " ) = = 0 )
{
if ( dstrategy )
errorConflictingDefElem ( defel , pstate ) ;
dstrategy = defel ;
}
else
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
@ -413,6 +987,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
dbtemplate ) ) ) ;
}
/* Validate the database creation strategy. */
if ( dstrategy & & dstrategy - > arg )
{
char * strategy ;
strategy = defGetString ( dstrategy ) ;
if ( strcmp ( strategy , " wal_log " ) = = 0 )
dbstrategy = CREATEDB_WAL_LOG ;
else if ( strcmp ( strategy , " file_copy " ) = = 0 )
dbstrategy = CREATEDB_FILE_COPY ;
else
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " invalid create database strategy %s " , strategy ) ,
errhint ( " Valid strategies are \" wal_log \" , and \" file_copy \" . " ) ) ) ;
}
/* If encoding or locales are defaulted, use source's setting */
if ( encoding < 0 )
encoding = src_encoding ;
@ -753,17 +1344,18 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
InvokeObjectPostCreateHook ( DatabaseRelationId , dboid , 0 ) ;
/*
* Force a checkpoint before starting the copy . This will force all dirty
* buffers , including those of unlogged tables , out to disk , to ensure
* source database is up - to - date on disk for the copy .
* FlushDatabaseBuffers ( ) would suffice for that , but we also want to
* process any pending unlink requests . Otherwise , if a checkpoint
* happened while we ' re copying files , a file might be deleted just when
* we ' re about to copy it , causing the lstat ( ) call in copydir ( ) to fail
* with ENOENT .
* If we ' re going to be reading data for the to - be - created database
* into shared_buffers , take a lock on it . Nobody should know that this
* database exists yet , but it ' s good to maintain the invariant that a
* lock an AccessExclusiveLock on the database is sufficient to drop all
* of its buffers without worrying about more being read later .
*
* Note that we need to do this before entering the PG_ENSURE_ERROR_CLEANUP
* block below , because createdb_failure_callback expects this lock to
* be held already .
*/
RequestCheckpoint ( CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
| CHECKPOINT_FLUSH_ALL ) ;
if ( dbstrategy = = CREATEDB_WAL_LOG )
LockSharedObject ( DatabaseRelationId , dboid , 0 , AccessShareLock ) ;
/*
* Once we start copying subdirectories , we need to be able to clean ' em
@ -774,101 +1366,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
*/
fparms . src_dboid = src_dboid ;
fparms . dest_dboid = dboid ;
fparms . strategy = dbstrategy ;
PG_ENSURE_ERROR_CLEANUP ( createdb_failure_callback ,
PointerGetDatum ( & fparms ) ) ;
{
/*
* Iterate through all tablespaces of the template database , and copy
* each one to the new database .
* If the user has asked to create a database with WAL_LOG strategy
* then call CreateDatabaseUsingWalLog , which will copy the database
* at the block level and it will WAL log each copied block .
* Otherwise , call CreateDatabaseUsingFileCopy that will copy the
* database file by file .
*/
rel = table_open ( TableSpaceRelationId , AccessShareLock ) ;
scan = table_beginscan_catalog ( rel , 0 , NULL ) ;
while ( ( tuple = heap_getnext ( scan , ForwardScanDirection ) ) ! = NULL )
{
Form_pg_tablespace spaceform = ( Form_pg_tablespace ) GETSTRUCT ( tuple ) ;
Oid srctablespace = spaceform - > oid ;
Oid dsttablespace ;
char * srcpath ;
char * dstpath ;
struct stat st ;
/* No need to copy global tablespace */
if ( srctablespace = = GLOBALTABLESPACE_OID )
continue ;
srcpath = GetDatabasePath ( src_dboid , srctablespace ) ;
if ( stat ( srcpath , & st ) < 0 | | ! S_ISDIR ( st . st_mode ) | |
directory_is_empty ( srcpath ) )
{
/* Assume we can ignore it */
pfree ( srcpath ) ;
continue ;
}
if ( srctablespace = = src_deftablespace )
dsttablespace = dst_deftablespace ;
if ( dbstrategy = = CREATEDB_WAL_LOG )
CreateDatabaseUsingWalLog ( src_dboid , dboid , src_deftablespace ,
dst_deftablespace ) ;
else
dsttablespace = srctablespace ;
dstpath = GetDatabasePath ( dboid , dsttablespace ) ;
/*
* Copy this subdirectory to the new location
*
* We don ' t need to copy subdirectories
*/
copydir ( srcpath , dstpath , false ) ;
/* Record the filesystem change in XLOG */
{
xl_dbase_create_rec xlrec ;
xlrec . db_id = dboid ;
xlrec . tablespace_id = dsttablespace ;
xlrec . src_db_id = src_dboid ;
xlrec . src_tablespace_id = srctablespace ;
XLogBeginInsert ( ) ;
XLogRegisterData ( ( char * ) & xlrec , sizeof ( xl_dbase_create_rec ) ) ;
( void ) XLogInsert ( RM_DBASE_ID ,
XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE ) ;
}
}
table_endscan ( scan ) ;
table_close ( rel , AccessShareLock ) ;
/*
* We force a checkpoint before committing . This effectively means
* that committed XLOG_DBASE_CREATE operations will never need to be
* replayed ( at least not in ordinary crash recovery ; we still have to
* make the XLOG entry for the benefit of PITR operations ) . This
* avoids two nasty scenarios :
*
* # 1 : When PITR is off , we don ' t XLOG the contents of newly created
* indexes ; therefore the drop - and - recreate - whole - directory behavior
* of DBASE_CREATE replay would lose such indexes .
*
* # 2 : Since we have to recopy the source database during DBASE_CREATE
* replay , we run the risk of copying changes in it that were
* committed after the original CREATE DATABASE command but before the
* system crash that led to the replay . This is at least unexpected
* and at worst could lead to inconsistencies , eg duplicate table
* names .
*
* ( Both of these were real bugs in releases 8.0 through 8.0 .3 . )
*
* In PITR replay , the first of these isn ' t an issue , and the second
* is only a risk if the CREATE DATABASE and subsequent template
* database change both occur while a base backup is being taken .
* There doesn ' t seem to be much we can do about that except document
* it as a limitation .
*
* Perhaps if we ever implement CREATE DATABASE in a less cheesy way ,
* we can avoid this .
*/
RequestCheckpoint ( CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT ) ;
CreateDatabaseUsingFileCopy ( src_dboid , dboid , src_deftablespace ,
dst_deftablespace ) ;
/*
* Close pg_database , but keep lock till commit .
@ -954,6 +1469,25 @@ createdb_failure_callback(int code, Datum arg)
{
createdb_failure_params * fparms = ( createdb_failure_params * ) DatumGetPointer ( arg ) ;
/*
* If we were copying database at block levels then drop pages for the
* destination database that are in the shared buffer cache . And tell
* checkpointer to forget any pending fsync and unlink requests for files
* in the database . The reasoning behind doing this is same as explained
* in dropdb function . But unlike dropdb we don ' t need to call
* pgstat_drop_database because this database is still not created so
* there should not be any stat for this .
*/
if ( fparms - > strategy = = CREATEDB_WAL_LOG )
{
DropDatabaseBuffers ( fparms - > dest_dboid ) ;
ForgetDatabaseSyncRequests ( fparms - > dest_dboid ) ;
/* Release lock on the target database. */
UnlockSharedObject ( DatabaseRelationId , fparms - > dest_dboid , 0 ,
AccessShareLock ) ;
}
/*
* Release lock on source database before doing recursive remove . This is
* not essential but it seems desirable to release the lock as soon as
@ -1478,7 +2012,7 @@ movedb(const char *dbname, const char *tblspcname)
* Record the filesystem change in XLOG
*/
{
xl_dbase_create_rec xlrec ;
xl_dbase_create_file_copy_ rec xlrec ;
xlrec . db_id = db_id ;
xlrec . tablespace_id = dst_tblspcoid ;
@ -1486,10 +2020,11 @@ movedb(const char *dbname, const char *tblspcname)
xlrec . src_tablespace_id = src_tblspcoid ;
XLogBeginInsert ( ) ;
XLogRegisterData ( ( char * ) & xlrec , sizeof ( xl_dbase_create_rec ) ) ;
XLogRegisterData ( ( char * ) & xlrec ,
sizeof ( xl_dbase_create_file_copy_rec ) ) ;
( void ) XLogInsert ( RM_DBASE_ID ,
XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE ) ;
XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE ) ;
}
/*
@ -1525,9 +2060,10 @@ movedb(const char *dbname, const char *tblspcname)
/*
* Force another checkpoint here . As in CREATE DATABASE , this is to
* ensure that we don ' t have to replay a committed XLOG_DBASE_CREATE
* operation , which would cause us to lose any unlogged operations
* done in the new DB tablespace before the next checkpoint .
* ensure that we don ' t have to replay a committed
* XLOG_DBASE_CREATE_FILE_COPY operation , which would cause us to lose
* any unlogged operations done in the new DB tablespace before the
* next checkpoint .
*/
RequestCheckpoint ( CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT ) ;
@ -2478,9 +3014,10 @@ dbase_redo(XLogReaderState *record)
/* Backup blocks are not used in dbase records */
Assert ( ! XLogRecHasAnyBlockRefs ( record ) ) ;
if ( info = = XLOG_DBASE_CREATE )
if ( info = = XLOG_DBASE_CREATE_FILE_COPY )
{
xl_dbase_create_rec * xlrec = ( xl_dbase_create_rec * ) XLogRecGetData ( record ) ;
xl_dbase_create_file_copy_rec * xlrec =
( xl_dbase_create_file_copy_rec * ) XLogRecGetData ( record ) ;
char * src_path ;
char * dst_path ;
struct stat st ;
@ -2515,6 +3052,18 @@ dbase_redo(XLogReaderState *record)
*/
copydir ( src_path , dst_path , false ) ;
}
else if ( info = = XLOG_DBASE_CREATE_WAL_LOG )
{
xl_dbase_create_wal_log_rec * xlrec =
( xl_dbase_create_wal_log_rec * ) XLogRecGetData ( record ) ;
char * dbpath ;
dbpath = GetDatabasePath ( xlrec - > db_id , xlrec - > tablespace_id ) ;
/* Create the database directory with the version file. */
CreateDirAndVersionFile ( dbpath , xlrec - > db_id , xlrec - > tablespace_id ,
true ) ;
}
else if ( info = = XLOG_DBASE_DROP )
{
xl_dbase_drop_rec * xlrec = ( xl_dbase_drop_rec * ) XLogRecGetData ( record ) ;