@ -58,7 +58,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
SetupWorkerPtrType setupWorkerPtr ) ;
static void _getObjectDescription ( PQExpBuffer buf , TocEntry * te ,
ArchiveHandle * AH ) ;
static void _printTocEntry ( ArchiveHandle * AH , TocEntry * te , bool isData , bool acl_pass ) ;
static void _printTocEntry ( ArchiveHandle * AH , TocEntry * te , bool isData ) ;
static char * replace_line_endings ( const char * str ) ;
static void _doSetFixedOutputState ( ArchiveHandle * AH ) ;
static void _doSetSessionAuth ( ArchiveHandle * AH , const char * user ) ;
@ -71,6 +71,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
static void processEncodingEntry ( ArchiveHandle * AH , TocEntry * te ) ;
static void processStdStringsEntry ( ArchiveHandle * AH , TocEntry * te ) ;
static teReqs _tocEntryRequired ( TocEntry * te , teSection curSection , RestoreOptions * ropt ) ;
static RestorePass _tocEntryRestorePass ( TocEntry * te ) ;
static bool _tocEntryIsACL ( TocEntry * te ) ;
static void _disableTriggersIfNecessary ( ArchiveHandle * AH , TocEntry * te ) ;
static void _enableTriggersIfNecessary ( ArchiveHandle * AH , TocEntry * te ) ;
@ -86,13 +87,18 @@ static OutputContext SaveOutput(ArchiveHandle *AH);
static void RestoreOutput ( ArchiveHandle * AH , OutputContext savedContext ) ;
static int restore_toc_entry ( ArchiveHandle * AH , TocEntry * te , bool is_parallel ) ;
static void restore_toc_entries_prefork ( ArchiveHandle * AH ) ;
static void restore_toc_entries_parallel ( ArchiveHandle * AH , ParallelState * pstate ,
static void restore_toc_entries_prefork ( ArchiveHandle * AH ,
TocEntry * pending_list ) ;
static void restore_toc_entries_parallel ( ArchiveHandle * AH ,
ParallelState * pstate ,
TocEntry * pending_list ) ;
static void restore_toc_entries_postfork ( ArchiveHandle * AH ,
TocEntry * pending_list ) ;
static void restore_toc_entries_postfork ( ArchiveHandle * AH , TocEntry * pending_list ) ;
static void par_list_header_init ( TocEntry * l ) ;
static void par_list_append ( TocEntry * l , TocEntry * te ) ;
static void par_list_remove ( TocEntry * te ) ;
static void move_to_ready_list ( TocEntry * pending_list , TocEntry * ready_list ,
RestorePass pass ) ;
static TocEntry * get_next_work_item ( ArchiveHandle * AH ,
TocEntry * ready_list ,
ParallelState * pstate ) ;
@ -625,20 +631,18 @@ RestoreArchive(Archive *AHX)
AH - > currSchema = NULL ;
}
/*
* In serial mode , we now process each non - ACL TOC entry .
*
* In parallel mode , turn control over to the parallel - restore logic .
*/
if ( parallel_mode )
{
/*
* In parallel mode , turn control over to the parallel - restore logic .
*/
ParallelState * pstate ;
TocEntry pending_list ;
par_list_header_init ( & pending_list ) ;
/* This runs PRE_DATA items and then disconnects from the database */
restore_toc_entries_prefork ( AH ) ;
restore_toc_entries_prefork ( AH , & pending_list ) ;
Assert ( AH - > connection = = NULL ) ;
/* ParallelBackupStart() will actually fork the processes */
@ -652,28 +656,51 @@ RestoreArchive(Archive *AHX)
}
else
{
/*
* In serial mode , process everything in three phases : normal items ,
* then ACLs , then matview refresh items . We might be able to skip
* one or both extra phases in some cases , eg data - only restores .
*/
bool haveACL = false ;
bool haveRefresh = false ;
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
( void ) restore_toc_entry ( AH , te , false ) ;
}
{
if ( ( te - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) = = 0 )
continue ; /* ignore if not to be dumped at all */
/*
* Scan TOC again to output ownership commands and ACLs
*/
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
{
AH - > currentTE = te ;
switch ( _tocEntryRestorePass ( te ) )
{
case RESTORE_PASS_MAIN :
( void ) restore_toc_entry ( AH , te , false ) ;
break ;
case RESTORE_PASS_ACL :
haveACL = true ;
break ;
case RESTORE_PASS_REFRESH :
haveRefresh = true ;
break ;
}
}
/* Both schema and data objects might now have ownership/ACLs */
if ( ( te - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) ! = 0 )
if ( haveACL )
{
/* Show namespace if available */
if ( te - > namespace )
ahlog ( AH , 1 , " setting owner and privileges for %s \" %s.%s \" \n " ,
te - > desc , te - > namespace , te - > tag ) ;
else
ahlog ( AH , 1 , " setting owner and privileges for %s \" %s \" \n " ,
te - > desc , te - > tag ) ;
_printTocEntry ( AH , te , false , true ) ;
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
{
if ( ( te - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) ! = 0 & &
_tocEntryRestorePass ( te ) = = RESTORE_PASS_ACL )
( void ) restore_toc_entry ( AH , te , false ) ;
}
}
if ( haveRefresh )
{
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
{
if ( ( te - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) ! = 0 & &
_tocEntryRestorePass ( te ) = = RESTORE_PASS_REFRESH )
( void ) restore_toc_entry ( AH , te , false ) ;
}
}
}
@ -720,10 +747,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
AH - > currentTE = te ;
/* Work out what, if anything, we want from this entry */
if ( _tocEntryIsACL ( te ) )
reqs = 0 ; /* ACLs are never restored here */
else
reqs = te - > reqs ;
reqs = te - > reqs ;
/*
* Ignore DATABASE entry unless we should create it . We must check this
@ -744,17 +768,19 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
defnDumped = false ;
if ( ( reqs & REQ_SCHEMA ) ! = 0 ) /* We want the schema */
/*
* If it has a schema component that we want , then process that
*/
if ( ( reqs & REQ_SCHEMA ) ! = 0 )
{
/* Show namespace if available */
/* Show namespace in log message i f available */
if ( te - > namespace )
ahlog ( AH , 1 , " creating %s \" %s.%s \" \n " ,
te - > desc , te - > namespace , te - > tag ) ;
else
ahlog ( AH , 1 , " creating %s \" %s \" \n " , te - > desc , te - > tag ) ;
_printTocEntry ( AH , te , false , false ) ;
_printTocEntry ( AH , te , false ) ;
defnDumped = true ;
if ( strcmp ( te - > desc , " TABLE " ) = = 0 )
@ -810,7 +836,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
}
/*
* If we have a data component , then process i t
* If it has a data component that we want , then process tha t
*/
if ( ( reqs & REQ_DATA ) ! = 0 )
{
@ -826,7 +852,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
*/
if ( AH - > PrintTocDataPtr ! = NULL )
{
_printTocEntry ( AH , te , true , false ) ;
_printTocEntry ( AH , te , true ) ;
if ( strcmp ( te - > desc , " BLOBS " ) = = 0 | |
strcmp ( te - > desc , " BLOB COMMENTS " ) = = 0 )
@ -914,7 +940,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
{
/* If we haven't already dumped the defn part, do so now */
ahlog ( AH , 1 , " executing %s %s \n " , te - > desc , te - > tag ) ;
_printTocEntry ( AH , te , false , false ) ;
_printTocEntry ( AH , te , false ) ;
}
}
@ -2943,8 +2969,30 @@ _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
return res ;
}
/*
* Identify which pass we should restore this TOC entry in .
*
* See notes with the RestorePass typedef in pg_backup_archiver . h .
*/
static RestorePass
_tocEntryRestorePass ( TocEntry * te )
{
/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
if ( strcmp ( te - > desc , " ACL " ) = = 0 | |
strcmp ( te - > desc , " ACL LANGUAGE " ) = = 0 | |
strcmp ( te - > desc , " DEFAULT ACL " ) = = 0 )
return RESTORE_PASS_ACL ;
if ( strcmp ( te - > desc , " MATERIALIZED VIEW DATA " ) = = 0 )
return RESTORE_PASS_REFRESH ;
return RESTORE_PASS_MAIN ;
}
/*
* Identify TOC entries that are ACLs .
*
* Note : it seems worth duplicating some code here to avoid a hard - wired
* assumption that these are exactly the same entries that we restore during
* the RESTORE_PASS_ACL phase .
*/
static bool
_tocEntryIsACL ( TocEntry * te )
@ -3364,23 +3412,18 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
type ) ;
}
/*
* Emit the SQL commands to create the object represented by a TOC entry
*
* This now also includes issuing an ALTER OWNER command to restore the
* object ' s ownership , if wanted . But note that the object ' s permissions
* will remain at default , until the matching ACL TOC entry is restored .
*/
static void
_printTocEntry ( ArchiveHandle * AH , TocEntry * te , bool isData , bool acl_pass )
_printTocEntry ( ArchiveHandle * AH , TocEntry * te , bool isData )
{
RestoreOptions * ropt = AH - > public . ropt ;
/* ACLs are dumped only during acl pass */
if ( acl_pass )
{
if ( ! _tocEntryIsACL ( te ) )
return ;
}
else
{
if ( _tocEntryIsACL ( te ) )
return ;
}
/*
* Avoid dumping the public schema , as it will already be created . . .
* unless we are using - - clean mode ( and * not * - - create mode ) , in which
@ -3567,7 +3610,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass)
* If it ' s an ACL entry , it might contain SET SESSION AUTHORIZATION
* commands , so we can no longer assume we know the current auth setting .
*/
if ( acl_pass )
if ( _tocEntryIsACL ( te ) )
{
if ( AH - > currUser )
free ( AH - > currUser ) ;
@ -3597,6 +3640,9 @@ replace_line_endings(const char *str)
return result ;
}
/*
* Write the file header for a custom - format archive
*/
void
WriteHead ( ArchiveHandle * AH )
{
@ -3772,16 +3818,14 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
/*
* Main engine for parallel restore .
*
* Work is done in three phases .
* First we process all SECTION_PRE_DATA tocEntries , in a single connection ,
* just as for a standard restore . Second we process the remaining non - ACL
* steps in parallel worker children ( threads on Windows , processes on Unix ) ,
* each of which connects separately to the database . Finally we process all
* the ACL entries in a single connection ( that happens back in
* RestoreArchive ) .
* Parallel restore is done in three phases . In this first phase ,
* we ' ll process all SECTION_PRE_DATA TOC entries that are allowed to be
* processed in the RESTORE_PASS_MAIN pass . ( In practice , that ' s all
* PRE_DATA items other than ACLs . ) Entries we can ' t process now are
* added to the pending_list for later phases to deal with .
*/
static void
restore_toc_entries_prefork ( ArchiveHandle * AH )
restore_toc_entries_prefork ( ArchiveHandle * AH , TocEntry * pending_list )
{
bool skipped_some ;
TocEntry * next_work_item ;
@ -3799,23 +3843,31 @@ restore_toc_entries_prefork(ArchiveHandle *AH)
* about showing all the dependencies of SECTION_PRE_DATA items , so we do
* not risk trying to process them out - of - order .
*
* Stuff that we can ' t do immediately gets added to the pending_list .
* Note : we don ' t yet filter out entries that aren ' t going to be restored .
* They might participate in dependency chains connecting entries that
* should be restored , so we treat them as live until we actually process
* them .
*
* Note : as of 9.2 , it should be guaranteed that all PRE_DATA items appear
* before DATA items , and all DATA items before POST_DATA items . That is
* not certain to be true in older archives , though , so this loop is coded
* to not assume it .
*/
AH - > restorePass = RESTORE_PASS_MAIN ;
skipped_some = false ;
for ( next_work_item = AH - > toc - > next ; next_work_item ! = AH - > toc ; next_work_item = next_work_item - > next )
{
/* NB: process-or-continue logic must be the inverse of loop below */
bool do_now = true ;
if ( next_work_item - > section ! = SECTION_PRE_DATA )
{
/* DATA and POST_DATA items are just ignored for now */
if ( next_work_item - > section = = SECTION_DATA | |
next_work_item - > section = = SECTION_POST_DATA )
{
do_now = false ;
skipped_some = true ;
continue ;
}
else
{
@ -3826,18 +3878,35 @@ restore_toc_entries_prefork(ArchiveHandle *AH)
* comment ' s dependencies are satisfied , so skip it for now .
*/
if ( skipped_some )
continu e;
do_now = fals e;
}
}
ahlog ( AH , 1 , " processing item %d %s %s \n " ,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/*
* Also skip items that need to be forced into later passes . We need
* not set skipped_some in this case , since by assumption no main - pass
* items could depend on these .
*/
if ( _tocEntryRestorePass ( next_work_item ) ! = RESTORE_PASS_MAIN )
do_now = false ;
if ( do_now )
{
/* OK, restore the item and update its dependencies */
ahlog ( AH , 1 , " processing item %d %s %s \n " ,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
( void ) restore_toc_entry ( AH , next_work_item , false ) ;
( void ) restore_toc_entry ( AH , next_work_item , false ) ;
/* there should be no touch of ready_list here, so pass NULL */
reduce_dependencies ( AH , next_work_item , NULL ) ;
/* there should be no touch of ready_list here, so pass NULL */
reduce_dependencies ( AH , next_work_item , NULL ) ;
}
else
{
/* Nope, so add it to pending_list */
par_list_append ( pending_list , next_work_item ) ;
}
}
/*
@ -3863,89 +3932,60 @@ restore_toc_entries_prefork(ArchiveHandle *AH)
/*
* Main engine for parallel restore .
*
* Work is done in three phases .
* First we process all SECTION_PRE_DATA tocEntries , in a single connection ,
* just as for a standard restore . This is done in restore_toc_entries_prefork ( ) .
* Second we process the remaining non - ACL steps in parallel worker children
* ( threads on Windows , processes on Unix ) , these fork off and set up their
* connections before we call restore_toc_entries_parallel_forked .
* Finally we process all the ACL entries in a single connection ( that happens
* back in RestoreArchive ) .
* Parallel restore is done in three phases . In this second phase ,
* we process entries by dispatching them to parallel worker children
* ( processes on Unix , threads on Windows ) , each of which connects
* separately to the database . Inter - entry dependencies are respected ,
* and so is the RestorePass multi - pass structure . When we can no longer
* make any entries ready to process , we exit . Normally , there will be
* nothing left to do ; but if there is , the third phase will mop up .
*/
static void
restore_toc_entries_parallel ( ArchiveHandle * AH , ParallelState * pstate ,
TocEntry * pending_list )
{
bool skipped_some ;
TocEntry ready_list ;
TocEntry * next_work_item ;
ahlog ( AH , 2 , " entering restore_toc_entries_parallel \n " ) ;
/*
* Initialize the lists of ready items , the list for pending items has
* already been initialized in the caller . After this setup , the pending
* list is everything that needs to be done but is blocked by one or more
* dependencies , while the ready list contains items that have no
* remaining dependencies . Note : we don ' t yet filter out entries that
* aren ' t going to be restored . They might participate in dependency
* chains connecting entries that should be restored , so we treat them as
* live until we actually process them .
* The pending_list contains all items that we need to restore . Move all
* items that are available to process immediately into the ready_list .
* After this setup , the pending list is everything that needs to be done
* but is blocked by one or more dependencies , while the ready list
* contains items that have no remaining dependencies and are OK to
* process in the current restore pass .
*/
par_list_header_init ( & ready_list ) ;
skipped_some = false ;
for ( next_work_item = AH - > toc - > next ; next_work_item ! = AH - > toc ; next_work_item = next_work_item - > next )
{
/* NB: process-or-continue logic must be the inverse of loop above */
if ( next_work_item - > section = = SECTION_PRE_DATA )
{
/* All PRE_DATA items were dealt with above */
continue ;
}
if ( next_work_item - > section = = SECTION_DATA | |
next_work_item - > section = = SECTION_POST_DATA )
{
/* set this flag at same point that previous loop did */
skipped_some = true ;
}
else
{
/* SECTION_NONE items must be processed if previous loop didn't */
if ( ! skipped_some )
continue ;
}
if ( next_work_item - > depCount > 0 )
par_list_append ( pending_list , next_work_item ) ;
else
par_list_append ( & ready_list , next_work_item ) ;
}
AH - > restorePass = RESTORE_PASS_MAIN ;
move_to_ready_list ( pending_list , & ready_list , AH - > restorePass ) ;
/*
* main parent loop
*
* Keep going until there is no worker still running AND there is no work
* left to be done .
* left to be done . Note invariant : at top of loop , there should always
* be at least one worker available to dispatch a job to .
*/
ahlog ( AH , 1 , " entering main parallel loop \n " ) ;
while ( ( next_work_item = get_next_work_item ( AH , & ready_list , pstate ) ) ! = NULL | |
! IsEveryWorkerIdle ( pstate ) )
for ( ; ; )
{
/* Look for an item ready to be dispatched to a worker */
next_work_item = get_next_work_item ( AH , & ready_list , pstate ) ;
if ( next_work_item ! = NULL )
{
/* If not to be restored, don't waste time launching a worker */
if ( ( next_work_item - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) = = 0 | |
_tocEntryIsACL ( next_work_item ) )
if ( ( next_work_item - > reqs & ( REQ_SCHEMA | REQ_DATA ) ) = = 0 )
{
ahlog ( AH , 1 , " skipping item %d %s %s \n " ,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/* Drop it from ready_list, and update its dependencies */
par_list_remove ( next_work_item ) ;
reduce_dependencies ( AH , next_work_item , & ready_list ) ;
/* Loop around to see if anything else can be dispatched */
continue ;
}
@ -3953,14 +3993,34 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/* Remove it from ready_list, and dispatch to some worker */
par_list_remove ( next_work_item ) ;
DispatchJobForTocEntry ( AH , pstate , next_work_item , ACT_RESTORE ,
mark_restore_job_done , & ready_list ) ;
}
else if ( IsEveryWorkerIdle ( pstate ) )
{
/*
* Nothing is ready and no worker is running , so we ' re done with
* the current pass or maybe with the whole process .
*/
if ( AH - > restorePass = = RESTORE_PASS_LAST )
break ; /* No more parallel processing is possible */
/* Advance to next restore pass */
AH - > restorePass + + ;
/* That probably allows some stuff to be made ready */
move_to_ready_list ( pending_list , & ready_list , AH - > restorePass ) ;
/* Loop around to see if anything's now ready */
continue ;
}
else
{
/* at least one child is working and we have nothing ready. */
/*
* We have nothing ready , but at least one child is working , so
* wait for some subjob to finish .
*/
}
/*
@ -3980,9 +4040,21 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS ) ;
}
/* There should now be nothing in ready_list. */
Assert ( ready_list . par_next = = & ready_list ) ;
ahlog ( AH , 1 , " finished main parallel loop \n " ) ;
}
/*
* Main engine for parallel restore .
*
* Parallel restore is done in three phases . In this third phase ,
* we mop up any remaining TOC entries by processing them serially .
* This phase normally should have nothing to do , but if we ' ve somehow
* gotten stuck due to circular dependencies or some such , this provides
* at least some chance of completing the restore successfully .
*/
static void
restore_toc_entries_postfork ( ArchiveHandle * AH , TocEntry * pending_list )
{
@ -4002,9 +4074,10 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
_doSetFixedOutputState ( AH ) ;
/*
* Make sure there is no non - ACL work left due to , say , circular
* dependencies , or some other pathological condition . If so , do it in the
* single parent connection .
* Make sure there is no work left due to , say , circular dependencies , or
* some other pathological condition . If so , do it in the single parent
* connection . We don ' t sweat about RestorePass ordering ; it ' s likely we
* already violated that .
*/
for ( te = pending_list - > par_next ; te ! = pending_list ; te = te - > par_next )
{
@ -4012,8 +4085,6 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
te - > dumpId , te - > desc , te - > tag ) ;
( void ) restore_toc_entry ( AH , te , false ) ;
}
/* The ACLs will be handled back in RestoreArchive. */
}
/*
@ -4072,6 +4143,36 @@ par_list_remove(TocEntry *te)
}
/*
* Move all immediately - ready items from pending_list to ready_list .
*
* Items are considered ready if they have no remaining dependencies and
* they belong in the current restore pass . ( See also reduce_dependencies ,
* which applies the same logic one - at - a - time . )
*/
static void
move_to_ready_list ( TocEntry * pending_list , TocEntry * ready_list ,
RestorePass pass )
{
TocEntry * te ;
TocEntry * next_te ;
for ( te = pending_list - > par_next ; te ! = pending_list ; te = next_te )
{
/* must save list link before possibly moving te to other list */
next_te = te - > par_next ;
if ( te - > depCount = = 0 & &
_tocEntryRestorePass ( te ) = = pass )
{
/* Remove it from pending_list ... */
par_list_remove ( te ) ;
/* ... and add to ready_list */
par_list_append ( ready_list , te ) ;
}
}
}
/*
* Find the next work item ( if any ) that is capable of being run now .
*
@ -4457,8 +4558,17 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
{
TocEntry * otherte = AH - > tocsByDumpId [ te - > revDeps [ i ] ] ;
Assert ( otherte - > depCount > 0 ) ;
otherte - > depCount - - ;
if ( otherte - > depCount = = 0 & & otherte - > par_prev ! = NULL )
/*
* It ' s ready if it has no remaining dependencies and it belongs in
* the current restore pass . However , don ' t move it if it has not yet
* been put into the pending list .
*/
if ( otherte - > depCount = = 0 & &
_tocEntryRestorePass ( otherte ) = = AH - > restorePass & &
otherte - > par_prev ! = NULL )
{
/* It must be in the pending list, so remove it ... */
par_list_remove ( otherte ) ;