@ -49,6 +49,24 @@ typedef struct _outputContext
int gzOut ;
} OutputContext ;
/*
* State for tracking TocEntrys that are ready to process during a parallel
* restore . ( This used to be a list , and we still call it that , though now
* it ' s really an array so that we can apply qsort to it . )
*
* tes [ ] is sized large enough that we can ' t overrun it .
* The valid entries are indexed first_te . . last_te inclusive .
* We periodically sort the array to bring larger - by - dataLength entries to
* the front ; " sorted " is true if the valid entries are known sorted .
*/
typedef struct _parallelReadyList
{
TocEntry * * tes ; /* Ready-to-dump TocEntrys */
int first_te ; /* index of first valid entry in tes[] */
int last_te ; /* index of last valid entry in tes[] */
bool sorted ; /* are valid entries currently sorted? */
} ParallelReadyList ;
/* translator: this is a module name */
static const char * modulename = gettext_noop ( " archiver " ) ;
@ -95,13 +113,20 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH,
TocEntry * pending_list ) ;
static void restore_toc_entries_postfork ( ArchiveHandle * AH ,
TocEntry * pending_list ) ;
static void par_list_header_init ( TocEntry * l ) ;
static void par_list_append ( TocEntry * l , TocEntry * te ) ;
static void par_list_remove ( TocEntry * te ) ;
static void move_to_ready_list ( TocEntry * pending_list , TocEntry * ready_list ,
static void pending_list_header_init ( TocEntry * l ) ;
static void pending_list_append ( TocEntry * l , TocEntry * te ) ;
static void pending_list_remove ( TocEntry * te ) ;
static void ready_list_init ( ParallelReadyList * ready_list , int tocCount ) ;
static void ready_list_free ( ParallelReadyList * ready_list ) ;
static void ready_list_insert ( ParallelReadyList * ready_list , TocEntry * te ) ;
static void ready_list_remove ( ParallelReadyList * ready_list , int i ) ;
static void ready_list_sort ( ParallelReadyList * ready_list ) ;
static int TocEntrySizeCompare ( const void * p1 , const void * p2 ) ;
static void move_to_ready_list ( TocEntry * pending_list ,
ParallelReadyList * ready_list ,
RestorePass pass ) ;
static TocEntry * get_next_work_item ( ArchiveHandle * AH ,
TocEntry * ready_list ,
static TocEntry * pop _next_work_item( ArchiveHandle * AH ,
ParallelReadyList * ready_list ,
ParallelState * pstate ) ;
static void mark_dump_job_done ( ArchiveHandle * AH ,
TocEntry * te ,
@ -116,7 +141,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies ( ArchiveHandle * AH ) ;
static void identify_locking_dependencies ( ArchiveHandle * AH , TocEntry * te ) ;
static void reduce_dependencies ( ArchiveHandle * AH , TocEntry * te ,
TocEntry * ready_list ) ;
ParallelReadyList * ready_list ) ;
static void mark_create_done ( ArchiveHandle * AH , TocEntry * te ) ;
static void inhibit_data_for_failed_table ( ArchiveHandle * AH , TocEntry * te ) ;
@ -639,7 +664,11 @@ RestoreArchive(Archive *AHX)
ParallelState * pstate ;
TocEntry pending_list ;
par_list_header_init ( & pending_list ) ;
/* The archive format module may need some setup for this */
if ( AH - > PrepParallelRestorePtr )
AH - > PrepParallelRestorePtr ( AH ) ;
pending_list_header_init ( & pending_list ) ;
/* This runs PRE_DATA items and then disconnects from the database */
restore_toc_entries_prefork ( AH , & pending_list ) ;
@ -1039,10 +1068,14 @@ WriteData(Archive *AHX, const void *data, size_t dLen)
/*
* Create a new TOC entry . The TOC was designed as a TOC , but is now the
* repository for all metadata . But the name has stuck .
*
* The new entry is added to the Archive ' s TOC list . Most callers can ignore
* the result value because nothing else need be done , but a few want to
* manipulate the TOC entry further .
*/
/* Public */
void
TocEntry *
ArchiveEntry ( Archive * AHX ,
CatalogId catalogId , DumpId dumpId ,
const char * tag ,
@ -1100,9 +1133,12 @@ ArchiveEntry(Archive *AHX,
newToc - > hadDumper = dumpFn ? true : false ;
newToc - > formatData = NULL ;
newToc - > dataLength = 0 ;
if ( AH - > ArchiveEntryPtr ! = NULL )
AH - > ArchiveEntryPtr ( AH , newToc ) ;
return newToc ;
}
/* Public */
@ -2413,32 +2449,59 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
{
TocEntry * te ;
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
if ( pstate & & pstate - > numWorkers > 1 )
{
if ( ! te - > dataDumper )
continue ;
if ( ( te - > reqs & REQ_DATA ) = = 0 )
continue ;
/*
* In parallel mode , this code runs in the master process . We
* construct an array of candidate TEs , then sort it into decreasing
* size order , then dispatch each TE to a data - transfer worker . By
* dumping larger tables first , we avoid getting into a situation
* where we ' re down to one job and it ' s big , losing parallelism .
*/
TocEntry * * tes ;
int ntes ;
if ( pstate & & pstate - > numWorkers > 1 )
tes = ( TocEntry * * ) pg_malloc ( AH - > tocCount * sizeof ( TocEntry * ) ) ;
ntes = 0 ;
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
{
/*
* If we are in a parallel backup , then we are always the master
* process . Dispatch each data - transfer job to a worker .
*/
DispatchJobForTocEntry ( AH , pstate , te , ACT_DUMP ,
mark_dump_job_done , NULL ) ;
/* Consider only TEs with dataDumper functions ... */
if ( ! te - > dataDumper )
continue ;
/* ... and ignore ones not enabled for dump */
if ( ( te - > reqs & REQ_DATA ) = = 0 )
continue ;
tes [ ntes + + ] = te ;
}
else
WriteDataChunksForTocEntry ( AH , te ) ;
}
/*
* If parallel , wait for workers to finish .
*/
if ( pstate & & pstate - > numWorkers > 1 )
if ( ntes > 1 )
qsort ( ( void * ) tes , ntes , sizeof ( TocEntry * ) ,
TocEntrySizeCompare ) ;
for ( int i = 0 ; i < ntes ; i + + )
DispatchJobForTocEntry ( AH , pstate , tes [ i ] , ACT_DUMP ,
mark_dump_job_done , NULL ) ;
pg_free ( tes ) ;
/* Now wait for workers to finish. */
WaitForWorkers ( AH , pstate , WFW_ALL_IDLE ) ;
}
else
{
/* Non-parallel mode: just dump all candidate TEs sequentially. */
for ( te = AH - > toc - > next ; te ! = AH - > toc ; te = te - > next )
{
/* Must have same filter conditions as above */
if ( ! te - > dataDumper )
continue ;
if ( ( te - > reqs & REQ_DATA ) = = 0 )
continue ;
WriteDataChunksForTocEntry ( AH , te ) ;
}
}
}
@ -2690,6 +2753,7 @@ ReadToc(ArchiveHandle *AH)
te - > dependencies = NULL ;
te - > nDeps = 0 ;
}
te - > dataLength = 0 ;
if ( AH - > ReadExtraTocPtr )
AH - > ReadExtraTocPtr ( AH , te ) ;
@ -3996,7 +4060,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
else
{
/* Nope, so add it to pending_list */
par _list_append ( pending_list , next_work_item ) ;
pending _list_append ( pending_list , next_work_item ) ;
}
}
@ -4035,11 +4099,14 @@ static void
restore_toc_entries_parallel ( ArchiveHandle * AH , ParallelState * pstate ,
TocEntry * pending_list )
{
TocEntry ready_list ;
ParallelReadyList ready_list ;
TocEntry * next_work_item ;
ahlog ( AH , 2 , " entering restore_toc_entries_parallel \n " ) ;
/* Set up ready_list with enough room for all known TocEntrys */
ready_list_init ( & ready_list , AH - > tocCount ) ;
/*
* The pending_list contains all items that we need to restore . Move all
* items that are available to process immediately into the ready_list .
@ -4048,7 +4115,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
* contains items that have no remaining dependencies and are OK to
* process in the current restore pass .
*/
par_list_header_init ( & ready_list ) ;
AH - > restorePass = RESTORE_PASS_MAIN ;
move_to_ready_list ( pending_list , & ready_list , AH - > restorePass ) ;
@ -4064,7 +4130,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
for ( ; ; )
{
/* Look for an item ready to be dispatched to a worker */
next_work_item = get _next_work_item( AH , & ready_list , pstate ) ;
next_work_item = pop _next_work_item( AH , & ready_list , pstate ) ;
if ( next_work_item ! = NULL )
{
/* If not to be restored, don't waste time launching a worker */
@ -4073,8 +4139,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
ahlog ( AH , 1 , " skipping item %d %s %s \n " ,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/* Drop it from ready_list, and update its dependencies */
par_list_remove ( next_work_item ) ;
/* Update its dependencies as though we'd completed it */
reduce_dependencies ( AH , next_work_item , & ready_list ) ;
/* Loop around to see if anything else can be dispatched */
continue ;
@ -4084,9 +4149,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/* Remove it from ready_list, and dispatch to some worker */
par_list_remove ( next_work_item ) ;
/* Dispatch to some worker */
DispatchJobForTocEntry ( AH , pstate , next_work_item , ACT_RESTORE ,
mark_restore_job_done , & ready_list ) ;
}
@ -4132,7 +4195,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
}
/* There should now be nothing in ready_list. */
Assert ( ready_list . par_next = = & ready_list ) ;
Assert ( ready_list . first_te > ready_list . last_te ) ;
ready_list_free ( & ready_list ) ;
ahlog ( AH , 1 , " finished main parallel loop \n " ) ;
}
@ -4170,7 +4235,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
* connection . We don ' t sweat about RestorePass ordering ; it ' s likely we
* already violated that .
*/
for ( te = pending_list - > par _next ; te ! = pending_list ; te = te - > par _next )
for ( te = pending_list - > pending _next ; te ! = pending_list ; te = te - > pending _next )
{
ahlog ( AH , 1 , " processing missed item %d %s %s \n " ,
te - > dumpId , te - > desc , te - > tag ) ;
@ -4201,36 +4266,130 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
/*
* Initialize the header of a parallel - processing list .
* Initialize the header of the pending - items list .
*
* These are circular lists with a dummy TocEntry as header , just like the
* This is a circular list with a dummy TocEntry as header , just like the
* main TOC list ; but we use separate list links so that an entry can be in
* the main TOC list as well as in a parallel - processing list .
* the main TOC list as well as in the pending list .
*/
static void
pending_list_header_init ( TocEntry * l )
{
l - > pending_prev = l - > pending_next = l ;
}
/* Append te to the end of the pending-list headed by l */
static void
pending_list_append ( TocEntry * l , TocEntry * te )
{
te - > pending_prev = l - > pending_prev ;
l - > pending_prev - > pending_next = te ;
l - > pending_prev = te ;
te - > pending_next = l ;
}
/* Remove te from the pending-list */
static void
pending_list_remove ( TocEntry * te )
{
te - > pending_prev - > pending_next = te - > pending_next ;
te - > pending_next - > pending_prev = te - > pending_prev ;
te - > pending_prev = NULL ;
te - > pending_next = NULL ;
}
/*
* Initialize the ready_list with enough room for up to tocCount entries .
*/
static void
par_list_header_init ( TocEntry * l )
ready _list_init ( ParallelReadyList * ready_list , int tocCount )
{
l - > par_prev = l - > par_next = l ;
ready_list - > tes = ( TocEntry * * )
pg_malloc ( tocCount * sizeof ( TocEntry * ) ) ;
ready_list - > first_te = 0 ;
ready_list - > last_te = - 1 ;
ready_list - > sorted = false ;
}
/* Append te to the end of the parallel-processing list headed by l */
/*
* Free storage for a ready_list .
*/
static void
ready_list_free ( ParallelReadyList * ready_list )
{
pg_free ( ready_list - > tes ) ;
}
/* Add te to the ready_list */
static void
par_list_append ( TocEntry * l , TocEntry * te )
ready_list_insert ( ParallelReadyList * ready_list , TocEntry * te )
{
te - > par_prev = l - > par_prev ;
l - > par_prev - > par_next = te ;
l - > par_prev = te ;
te - > par_next = l ;
ready_list - > tes [ + + ready_list - > last_te ] = te ;
/* List is (probably) not sorted anymore. */
ready_list - > sorted = false ;
}
/* Remove the i'th entry in the ready_list */
static void
ready_list_remove ( ParallelReadyList * ready_list , int i )
{
int f = ready_list - > first_te ;
Assert ( i > = f & & i < = ready_list - > last_te ) ;
/*
* In the typical case where the item to be removed is the first ready
* entry , we need only increment first_te to remove it . Otherwise , move
* the entries before it to compact the list . ( This preserves sortedness ,
* if any . ) We could alternatively move the entries after i , but there
* are typically many more of those .
*/
if ( i > f )
{
TocEntry * * first_te_ptr = & ready_list - > tes [ f ] ;
memmove ( first_te_ptr + 1 , first_te_ptr , ( i - f ) * sizeof ( TocEntry * ) ) ;
}
ready_list - > first_te + + ;
}
/* Remove te from whatever parallel-processing list it's in */
/* Sort the ready_list into the desired order */
static void
par_list_remove ( TocEntry * te )
ready_list_sort ( ParallelReadyList * ready_list )
{
te - > par_prev - > par_next = te - > par_next ;
te - > par_next - > par_prev = te - > par_prev ;
te - > par_prev = NULL ;
te - > par_next = NULL ;
if ( ! ready_list - > sorted )
{
int n = ready_list - > last_te - ready_list - > first_te + 1 ;
if ( n > 1 )
qsort ( ready_list - > tes + ready_list - > first_te , n ,
sizeof ( TocEntry * ) ,
TocEntrySizeCompare ) ;
ready_list - > sorted = true ;
}
}
/* qsort comparator for sorting TocEntries by dataLength */
static int
TocEntrySizeCompare ( const void * p1 , const void * p2 )
{
const TocEntry * te1 = * ( const TocEntry * const * ) p1 ;
const TocEntry * te2 = * ( const TocEntry * const * ) p2 ;
/* Sort by decreasing dataLength */
if ( te1 - > dataLength > te2 - > dataLength )
return - 1 ;
if ( te1 - > dataLength < te2 - > dataLength )
return 1 ;
/* For equal dataLengths, sort by dumpId, just to be stable */
if ( te1 - > dumpId < te2 - > dumpId )
return - 1 ;
if ( te1 - > dumpId > te2 - > dumpId )
return 1 ;
return 0 ;
}
@ -4242,52 +4401,50 @@ par_list_remove(TocEntry *te)
* which applies the same logic one - at - a - time . )
*/
static void
move_to_ready_list ( TocEntry * pending_list , TocEntry * ready_list ,
move_to_ready_list ( TocEntry * pending_list ,
ParallelReadyList * ready_list ,
RestorePass pass )
{
TocEntry * te ;
TocEntry * next_te ;
for ( te = pending_list - > par _next ; te ! = pending_list ; te = next_te )
for ( te = pending_list - > pending _next ; te ! = pending_list ; te = next_te )
{
/* must save list link before possibly moving te to other list */
next_te = te - > par _next ;
/* must save list link before possibly removing te from list */
next_te = te - > pending _next ;
if ( te - > depCount = = 0 & &
_tocEntryRestorePass ( te ) = = pass )
{
/* Remove it from pending_list ... */
par _list_remove ( te ) ;
pending _list_remove ( te ) ;
/* ... and add to ready_list */
par_list_append ( ready_list , te ) ;
ready_list_insert ( ready_list , te ) ;
}
}
}
/*
* Find the next work item ( if any ) that is capable of being run now .
* Find the next work item ( if any ) that is capable of being run now ,
* and remove it from the ready_list .
*
* Returns the item , or NULL if nothing is runnable .
*
* To qualify , the item must have no remaining dependencies
* and no requirements for locks that are incompatible with
* items currently running . Items in the ready_list are known to have
* no remaining dependencies , but we have to check for lock conflicts .
*
* Note that the returned item has * not * been removed from ready_list .
* The caller must do that after successfully dispatching the item .
*
* pref_non_data is for an alternative selection algorithm that gives
* preference to non - data items if there is already a data load running .
* It is currently disabled .
*/
static TocEntry *
get _next_work_item( ArchiveHandle * AH , TocEntry * ready_list ,
pop _next_work_item( ArchiveHandle * AH , ParallelReadyList * ready_list ,
ParallelState * pstate )
{
bool pref_non_data = false ; /* or get from AH->ropt */
TocEntry * data_te = NULL ;
TocEntry * te ;
int i ,
k ;
int data_te_index = - 1 ;
/*
* Bogus heuristics for pref_non_data
@ -4296,7 +4453,7 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
{
int count = 0 ;
for ( k = 0 ; k < pstate - > numWorkers ; k + + )
for ( int k = 0 ; k < pstate - > numWorkers ; k + + )
{
TocEntry * running_te = pstate - > te [ k ] ;
@ -4308,11 +4465,17 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
pref_non_data = false ;
}
/*
* Sort the ready_list so that we ' ll tackle larger jobs first .
*/
ready_list_sort ( ready_list ) ;
/*
* Search the ready_list until we find a suitable item .
*/
for ( te = ready_list - > par_next ; te ! = ready_list ; te = te - > par_next )
for ( int i = ready_list - > first_te ; i < = ready_list - > last_te ; i + + )
{
TocEntry * te = ready_list - > tes [ i ] ;
bool conflicts = false ;
/*
@ -4320,9 +4483,9 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
* that a currently running item also needs lock on , or vice versa . If
* so , we don ' t want to schedule them together .
*/
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
for ( int k = 0 ; k < pstate - > numWorkers ; k + + )
{
TocEntry * running_te = pstate - > te [ i ] ;
TocEntry * running_te = pstate - > te [ k ] ;
if ( running_te = = NULL )
continue ;
@ -4339,17 +4502,23 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
if ( pref_non_data & & te - > section = = SECTION_DATA )
{
if ( data_te = = NULL )
data_te = te ;
if ( data_te_index < 0 )
data_te_index = i ;
continue ;
}
/* passed all tests, so this item can run */
ready_list_remove ( ready_list , i ) ;
return te ;
}
if ( data_te ! = NULL )
if ( data_te_index > = 0 )
{
TocEntry * data_te = ready_list - > tes [ data_te_index ] ;
ready_list_remove ( ready_list , data_te_index ) ;
return data_te ;
}
ahlog ( AH , 2 , " no item ready \n " ) ;
return NULL ;
@ -4393,7 +4562,7 @@ mark_restore_job_done(ArchiveHandle *AH,
int status ,
void * callback_data )
{
TocEntry * ready_list = ( TocEntry * ) callback_data ;
ParallelReadyList * ready_list = ( ParallelReadyList * ) callback_data ;
ahlog ( AH , 1 , " finished item %d %s %s \n " ,
te - > dumpId , te - > desc , te - > tag ) ;
@ -4443,8 +4612,8 @@ fix_dependencies(ArchiveHandle *AH)
te - > depCount = te - > nDeps ;
te - > revDeps = NULL ;
te - > nRevDeps = 0 ;
te - > par _prev = NULL ;
te - > par _next = NULL ;
te - > pending _prev = NULL ;
te - > pending _next = NULL ;
}
/*
@ -4551,6 +4720,12 @@ fix_dependencies(ArchiveHandle *AH)
/*
* Change dependencies on table items to depend on table data items instead ,
* but only in POST_DATA items .
*
* Also , for any item having such dependency ( s ) , set its dataLength to the
* largest dataLength of the table data items it depends on . This ensures
* that parallel restore will prioritize larger jobs ( index builds , FK
* constraint checks , etc ) over smaller ones , avoiding situations where we
* end a restore with only one active job working on a large table .
*/
static void
repoint_table_dependencies ( ArchiveHandle * AH )
@ -4569,9 +4744,13 @@ repoint_table_dependencies(ArchiveHandle *AH)
if ( olddep < = AH - > maxDumpId & &
AH - > tableDataId [ olddep ] ! = 0 )
{
te - > dependencies [ i ] = AH - > tableDataId [ olddep ] ;
DumpId tabledataid = AH - > tableDataId [ olddep ] ;
TocEntry * tabledatate = AH - > tocsByDumpId [ tabledataid ] ;
te - > dependencies [ i ] = tabledataid ;
te - > dataLength = Max ( te - > dataLength , tabledatate - > dataLength ) ;
ahlog ( AH , 2 , " transferring dependency %d -> %d to %d \n " ,
te - > dumpId , olddep , AH - > tableDataId [ olddep ] ) ;
te - > dumpId , olddep , tabledataid ) ;
}
}
}
@ -4647,7 +4826,8 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
* becomes ready should be moved to the ready_list , if that ' s provided .
*/
static void
reduce_dependencies ( ArchiveHandle * AH , TocEntry * te , TocEntry * ready_list )
reduce_dependencies ( ArchiveHandle * AH , TocEntry * te ,
ParallelReadyList * ready_list )
{
int i ;
@ -4670,13 +4850,13 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
*/
if ( otherte - > depCount = = 0 & &
_tocEntryRestorePass ( otherte ) = = AH - > restorePass & &
otherte - > par _prev ! = NULL & &
otherte - > pending _prev ! = NULL & &
ready_list ! = NULL )
{
/* Remove it from pending list ... */
par _list_remove ( otherte ) ;
pending _list_remove ( otherte ) ;
/* ... and add to ready_list */
par_list_append ( ready_list , otherte ) ;
ready_list_insert ( ready_list , otherte ) ;
}
}
}