@ -34,6 +34,7 @@
# include "compress_io.h"
# include "dumputils.h"
# include "fe_utils/string_utils.h"
# include "lib/binaryheap.h"
# include "lib/stringinfo.h"
# include "libpq/libpq-fs.h"
# include "parallel.h"
@ -44,24 +45,6 @@
# define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
# define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
/*
* State for tracking TocEntrys that are ready to process during a parallel
* restore . ( This used to be a list , and we still call it that , though now
* it ' s really an array so that we can apply qsort to it . )
*
* tes [ ] is sized large enough that we can ' t overrun it .
* The valid entries are indexed first_te . . last_te inclusive .
* We periodically sort the array to bring larger - by - dataLength entries to
* the front ; " sorted " is true if the valid entries are known sorted .
*/
typedef struct _parallelReadyList
{
TocEntry * * tes ; /* Ready-to-dump TocEntrys */
int first_te ; /* index of first valid entry in tes[] */
int last_te ; /* index of last valid entry in tes[] */
bool sorted ; /* are valid entries currently sorted? */
} ParallelReadyList ;
static ArchiveHandle * _allocAH ( const char * FileSpec , const ArchiveFormat fmt ,
const pg_compress_specification compression_spec ,
@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
static void pending_list_header_init ( TocEntry * l ) ;
static void pending_list_append ( TocEntry * l , TocEntry * te ) ;
static void pending_list_remove ( TocEntry * te ) ;
static void ready_list_init ( ParallelReadyList * ready_list , int tocCount ) ;
static void ready_list_free ( ParallelReadyList * ready_list ) ;
static void ready_list_insert ( ParallelReadyList * ready_list , TocEntry * te ) ;
static void ready_list_remove ( ParallelReadyList * ready_list , int i ) ;
static void ready_list_sort ( ParallelReadyList * ready_list ) ;
static int TocEntrySizeCompare ( const void * p1 , const void * p2 ) ;
static void move_to_ready_list ( TocEntry * pending_list ,
ParallelReadyList * ready_list ,
static int TocEntrySizeCompareQsort ( const void * p1 , const void * p2 ) ;
static int TocEntrySizeCompareBinaryheap ( void * p1 , void * p2 , void * arg ) ;
static void move_to_ready_heap ( TocEntry * pending_list ,
binaryheap * ready_heap ,
RestorePass pass ) ;
static TocEntry * pop_next_work_item ( ParallelReadyList * ready_list ,
static TocEntry * pop_next_work_item ( binaryheap * ready_heap ,
ParallelState * pstate ) ;
static void mark_dump_job_done ( ArchiveHandle * AH ,
TocEntry * te ,
@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies ( ArchiveHandle * AH ) ;
static void identify_locking_dependencies ( ArchiveHandle * AH , TocEntry * te ) ;
static void reduce_dependencies ( ArchiveHandle * AH , TocEntry * te ,
ParallelReadyList * ready_list ) ;
binaryheap * ready_heap ) ;
static void mark_create_done ( ArchiveHandle * AH , TocEntry * te ) ;
static void inhibit_data_for_failed_table ( ArchiveHandle * AH , TocEntry * te ) ;
@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
}
if ( ntes > 1 )
qsort ( tes , ntes , sizeof ( TocEntry * ) , TocEntrySizeCompare ) ;
qsort ( tes , ntes , sizeof ( TocEntry * ) , TocEntrySizeCompareQsort ) ;
for ( int i = 0 ; i < ntes ; i + + )
DispatchJobForTocEntry ( AH , pstate , tes [ i ] , ACT_DUMP ,
@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
( void ) restore_toc_entry ( AH , next_work_item , false ) ;
/* Reduce dependencies, but don't move anything to ready_list */
/* Reduce dependencies, but don't move anything to ready_heap */
reduce_dependencies ( AH , next_work_item , NULL ) ;
}
else
@ -4027,24 +4006,26 @@ static void
restore_toc_entries_parallel ( ArchiveHandle * AH , ParallelState * pstate ,
TocEntry * pending_list )
{
ParallelReadyList ready_list ;
binaryheap * ready_heap ;
TocEntry * next_work_item ;
pg_log_debug ( " entering restore_toc_entries_parallel " ) ;
/* Set up ready_list with enough room for all known TocEntrys */
ready_list_init ( & ready_list , AH - > tocCount ) ;
/* Set up ready_heap with enough room for all known TocEntrys */
ready_heap = binaryheap_allocate ( AH - > tocCount ,
TocEntrySizeCompareBinaryheap ,
NULL ) ;
/*
* The pending_list contains all items that we need to restore . Move all
* items that are available to process immediately into the ready_list .
* items that are available to process immediately into the ready_heap .
* After this setup , the pending list is everything that needs to be done
* but is blocked by one or more dependencies , while the ready list
* but is blocked by one or more dependencies , while the ready heap
* contains items that have no remaining dependencies and are OK to
* process in the current restore pass .
*/
AH - > restorePass = RESTORE_PASS_MAIN ;
move_to_ready_list ( pending_list , & ready_list , AH - > restorePass ) ;
move_to_ready_heap ( pending_list , ready_heap , AH - > restorePass ) ;
/*
* main parent loop
@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
for ( ; ; )
{
/* Look for an item ready to be dispatched to a worker */
next_work_item = pop_next_work_item ( & ready_list , pstate ) ;
next_work_item = pop_next_work_item ( ready_heap , pstate ) ;
if ( next_work_item ! = NULL )
{
/* If not to be restored, don't waste time launching a worker */
@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item - > dumpId ,
next_work_item - > desc , next_work_item - > tag ) ;
/* Update its dependencies as though we'd completed it */
reduce_dependencies ( AH , next_work_item , & ready_list ) ;
reduce_dependencies ( AH , next_work_item , ready_heap ) ;
/* Loop around to see if anything else can be dispatched */
continue ;
}
@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
/* Dispatch to some worker */
DispatchJobForTocEntry ( AH , pstate , next_work_item , ACT_RESTORE ,
mark_restore_job_done , & ready_list ) ;
mark_restore_job_done , ready_heap ) ;
}
else if ( IsEveryWorkerIdle ( pstate ) )
{
@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
/* Advance to next restore pass */
AH - > restorePass + + ;
/* That probably allows some stuff to be made ready */
move_to_ready_list ( pending_list , & ready_list , AH - > restorePass ) ;
move_to_ready_heap ( pending_list , ready_heap , AH - > restorePass ) ;
/* Loop around to see if anything's now ready */
continue ;
}
@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS ) ;
}
/* There should now be nothing in ready_list . */
Assert ( ready_list . first_te > ready_list . last_te ) ;
/* There should now be nothing in ready_heap . */
Assert ( binaryheap_empty ( ready_heap ) ) ;
ready_list_free ( & ready_list ) ;
binaryheap_free ( ready_heap ) ;
pg_log_info ( " finished main parallel loop " ) ;
}
@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
}
/*
* Initialize the ready_list with enough room for up to tocCount entries .
*/
static void
ready_list_init ( ParallelReadyList * ready_list , int tocCount )
{
ready_list - > tes = ( TocEntry * * )
pg_malloc ( tocCount * sizeof ( TocEntry * ) ) ;
ready_list - > first_te = 0 ;
ready_list - > last_te = - 1 ;
ready_list - > sorted = false ;
}
/*
* Free storage for a ready_list .
*/
static void
ready_list_free ( ParallelReadyList * ready_list )
{
pg_free ( ready_list - > tes ) ;
}
/* Add te to the ready_list */
static void
ready_list_insert ( ParallelReadyList * ready_list , TocEntry * te )
{
ready_list - > tes [ + + ready_list - > last_te ] = te ;
/* List is (probably) not sorted anymore. */
ready_list - > sorted = false ;
}
/* Remove the i'th entry in the ready_list */
static void
ready_list_remove ( ParallelReadyList * ready_list , int i )
{
int f = ready_list - > first_te ;
Assert ( i > = f & & i < = ready_list - > last_te ) ;
/*
* In the typical case where the item to be removed is the first ready
* entry , we need only increment first_te to remove it . Otherwise , move
* the entries before it to compact the list . ( This preserves sortedness ,
* if any . ) We could alternatively move the entries after i , but there
* are typically many more of those .
*/
if ( i > f )
{
TocEntry * * first_te_ptr = & ready_list - > tes [ f ] ;
memmove ( first_te_ptr + 1 , first_te_ptr , ( i - f ) * sizeof ( TocEntry * ) ) ;
}
ready_list - > first_te + + ;
}
/* Sort the ready_list into the desired order */
static void
ready_list_sort ( ParallelReadyList * ready_list )
{
if ( ! ready_list - > sorted )
{
int n = ready_list - > last_te - ready_list - > first_te + 1 ;
if ( n > 1 )
qsort ( ready_list - > tes + ready_list - > first_te , n ,
sizeof ( TocEntry * ) ,
TocEntrySizeCompare ) ;
ready_list - > sorted = true ;
}
}
/* qsort comparator for sorting TocEntries by dataLength */
static int
TocEntrySizeCompare ( const void * p1 , const void * p2 )
TocEntrySizeCompareQsort ( const void * p1 , const void * p2 )
{
const TocEntry * te1 = * ( const TocEntry * const * ) p1 ;
const TocEntry * te2 = * ( const TocEntry * const * ) p2 ;
@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
return 0 ;
}
/* binaryheap comparator for sorting TocEntries by dataLength */
static int
TocEntrySizeCompareBinaryheap ( void * p1 , void * p2 , void * arg )
{
/* return opposite of qsort comparator for max-heap */
return - TocEntrySizeCompareQsort ( & p1 , & p2 ) ;
}
/*
* Move all immediately - ready items from pending_list to ready_list .
* Move all immediately - ready items from pending_list to ready_heap .
*
* Items are considered ready if they have no remaining dependencies and
* they belong in the current restore pass . ( See also reduce_dependencies ,
* which applies the same logic one - at - a - time . )
*/
static void
move_to_ready_list ( TocEntry * pending_list ,
ParallelReadyList * ready_list ,
move_to_ready_heap ( TocEntry * pending_list ,
binaryheap * ready_heap ,
RestorePass pass )
{
TocEntry * te ;
@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
{
/* Remove it from pending_list ... */
pending_list_remove ( te ) ;
/* ... and add to ready_list */
ready_list_insert ( ready_list , te ) ;
/* ... and add to ready_heap */
binaryheap_add ( ready_heap , te ) ;
}
}
}
/*
* Find the next work item ( if any ) that is capable of being run now ,
* and remove it from the ready_list .
* and remove it from the ready_heap .
*
* Returns the item , or NULL if nothing is runnable .
*
* To qualify , the item must have no remaining dependencies
* and no requirements for locks that are incompatible with
* items currently running . Items in the ready_list are known to have
* items currently running . Items in the ready_heap are known to have
* no remaining dependencies , but we have to check for lock conflicts .
*/
static TocEntry *
pop_next_work_item ( ParallelReadyList * ready_list ,
pop_next_work_item ( binaryheap * ready_heap ,
ParallelState * pstate )
{
/*
* Sort the ready_list so that we ' ll tackle larger jobs first .
*/
ready_list_sort ( ready_list ) ;
/*
* Search the ready_list until we find a suitable item .
* Search the ready_heap until we find a suitable item . Note that we do a
* sequential scan through the heap nodes , so even though we will first
* try to choose the highest - priority item , we might end up picking
* something with a much lower priority . However , we expect that we will
* typically be able to pick one of the first few items , which should
* usually have a relatively high priority .
*/
for ( int i = ready_list - > first_te ; i < = ready_list - > last_te ; i + + )
for ( int i = 0 ; i < binaryheap_size ( ready_heap ) ; i + + )
{
TocEntry * te = ready_list - > tes [ i ] ;
TocEntry * te = ( TocEntry * ) binaryheap_get_node ( ready_heap , i ) ;
bool conflicts = false ;
/*
@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
continue ;
/* passed all tests, so this item can run */
ready_list_remove ( ready_list , i ) ;
binaryheap_remove_node ( ready_heap , i ) ;
return te ;
}
@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
int status ,
void * callback_data )
{
ParallelReadyList * ready_list = ( ParallelReadyList * ) callback_data ;
binaryheap * ready_heap = ( binaryheap * ) callback_data ;
pg_log_info ( " finished item %d %s %s " ,
te - > dumpId , te - > desc , te - > tag ) ;
@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
pg_fatal ( " worker process failed: exit code %d " ,
status ) ;
reduce_dependencies ( AH , te , ready_list ) ;
reduce_dependencies ( AH , te , ready_heap ) ;
}
@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
/*
* Remove the specified TOC entry from the depCounts of items that depend on
* it , thereby possibly making them ready - to - run . Any pending item that
* becomes ready should be moved to the ready_list , if that ' s provided .
* becomes ready should be moved to the ready_heap , if that ' s provided .
*/
static void
reduce_dependencies ( ArchiveHandle * AH , TocEntry * te ,
ParallelReadyList * ready_list )
binaryheap * ready_heap )
{
int i ;
@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
* the current restore pass , and it is currently a member of the
* pending list ( that check is needed to prevent double restore in
* some cases where a list - file forces out - of - order restoring ) .
* However , if ready_list = = NULL then caller doesn ' t want any list
* However , if ready_heap = = NULL then caller doesn ' t want any list
* memberships changed .
*/
if ( otherte - > depCount = = 0 & &
_tocEntryRestorePass ( otherte ) = = AH - > restorePass & &
otherte - > pending_prev ! = NULL & &
ready_list ! = NULL )
ready_heap ! = NULL )
{
/* Remove it from pending list ... */
pending_list_remove ( otherte ) ;
/* ... and add to ready_list */
ready_list_insert ( ready_list , otherte ) ;
/* ... and add to ready_heap */
binaryheap_add ( ready_heap , otherte ) ;
}
}
}