@ -33,6 +33,7 @@
# include "postmaster/autovacuum.h"
# include "postmaster/autovacuum.h"
# include "storage/bufmgr.h"
# include "storage/bufmgr.h"
# include "storage/freespace.h"
# include "storage/freespace.h"
# include "tcop/tcopprot.h" /* pgrminclude ignore */
# include "utils/acl.h"
# include "utils/acl.h"
# include "utils/builtins.h"
# include "utils/builtins.h"
# include "utils/datum.h"
# include "utils/datum.h"
@ -40,7 +41,119 @@
# include "utils/index_selfuncs.h"
# include "utils/index_selfuncs.h"
# include "utils/memutils.h"
# include "utils/memutils.h"
# include "utils/rel.h"
# include "utils/rel.h"
# include "utils/tuplesort.h"
/* Magic numbers for parallel state sharing */
# define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
# define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
# define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
# define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
# define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
/*
* Status record for spooling / sorting phase .
*/
typedef struct BrinSpool
{
Tuplesortstate * sortstate ; /* state data for tuplesort.c */
Relation heap ;
Relation index ;
} BrinSpool ;
/*
* Status for index builds performed in parallel . This is allocated in a
* dynamic shared memory segment .
*/
typedef struct BrinShared
{
/*
* These fields are not modified during the build . They primarily exist
* for the benefit of worker processes that need to create state
* corresponding to that used by the leader .
*/
Oid heaprelid ;
Oid indexrelid ;
bool isconcurrent ;
BlockNumber pagesPerRange ;
int scantuplesortstates ;
/*
* workersdonecv is used to monitor the progress of workers . All parallel
* participants must indicate that they are done before leader can use
* results built by the workers ( and before leader can write the data into
* the index ) .
*/
ConditionVariable workersdonecv ;
/*
* mutex protects all fields before heapdesc .
*
* These fields contain status information of interest to BRIN index
* builds that must work just the same when an index is built in parallel .
*/
slock_t mutex ;
/*
* Mutable state that is maintained by workers , and reported back to
* leader at end of the scans .
*
* nparticipantsdone is number of worker processes finished .
*
* reltuples is the total number of input heap tuples .
*
* indtuples is the total number of tuples that made it into the index .
*/
int nparticipantsdone ;
double reltuples ;
double indtuples ;
/*
* ParallelTableScanDescData data follows . Can ' t directly embed here , as
* implementations of the parallel table scan desc interface might need
* stronger alignment .
*/
} BrinShared ;
/*
* Return pointer to a BrinShared ' s parallel table scan .
*
* c . f . shm_toc_allocate as to why BUFFERALIGN is used , rather than just
* MAXALIGN .
*/
# define ParallelTableScanFromBrinShared(shared) \
( ParallelTableScanDesc ) ( ( char * ) ( shared ) + BUFFERALIGN ( sizeof ( BrinShared ) ) )
/*
* Status for leader in parallel index build .
*/
typedef struct BrinLeader
{
/* parallel context itself */
ParallelContext * pcxt ;
/*
* nparticipanttuplesorts is the exact number of worker processes
* successfully launched , plus one leader process if it participates as a
* worker ( only DISABLE_LEADER_PARTICIPATION builds avoid leader
* participating as a worker ) .
*/
int nparticipanttuplesorts ;
/*
* Leader process convenience pointers to shared state ( leader avoids TOC
* lookups ) .
*
* brinshared is the shared state for entire build . sharedsort is the
* shared , tuplesort - managed state passed to each process tuplesort .
* snapshot is the snapshot used by the scan iff an MVCC snapshot is
* required .
*/
BrinShared * brinshared ;
Sharedsort * sharedsort ;
Snapshot snapshot ;
WalUsage * walusage ;
BufferUsage * bufferusage ;
} BrinLeader ;
/*
/*
* We use a BrinBuildState during initial construction of a BRIN index .
* We use a BrinBuildState during initial construction of a BRIN index .
@ -49,7 +162,8 @@
typedef struct BrinBuildState
typedef struct BrinBuildState
{
{
Relation bs_irel ;
Relation bs_irel ;
int bs_numtuples ;
double bs_numtuples ;
double bs_reltuples ;
Buffer bs_currentInsertBuf ;
Buffer bs_currentInsertBuf ;
BlockNumber bs_pagesPerRange ;
BlockNumber bs_pagesPerRange ;
BlockNumber bs_currRangeStart ;
BlockNumber bs_currRangeStart ;
@ -57,9 +171,19 @@ typedef struct BrinBuildState
BrinRevmap * bs_rmAccess ;
BrinRevmap * bs_rmAccess ;
BrinDesc * bs_bdesc ;
BrinDesc * bs_bdesc ;
BrinMemTuple * bs_dtuple ;
BrinMemTuple * bs_dtuple ;
BrinTuple * bs_emptyTuple ;
BrinTuple * bs_emptyTuple ;
Size bs_emptyTupleLen ;
Size bs_emptyTupleLen ;
MemoryContext bs_context ;
MemoryContext bs_context ;
/*
* bs_leader is only present when a parallel index build is performed , and
* only in the leader process . ( Actually , only the leader process has a
* BrinBuildState . )
*/
BrinLeader * bs_leader ;
int bs_worker_id ;
BrinSpool * bs_spool ;
} BrinBuildState ;
} BrinBuildState ;
/*
/*
@ -94,6 +218,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
static void brinsummarize ( Relation index , Relation heapRel , BlockNumber pageRange ,
static void brinsummarize ( Relation index , Relation heapRel , BlockNumber pageRange ,
bool include_partial , double * numSummarized , double * numExisting ) ;
bool include_partial , double * numSummarized , double * numExisting ) ;
static void form_and_insert_tuple ( BrinBuildState * state ) ;
static void form_and_insert_tuple ( BrinBuildState * state ) ;
static void form_and_spill_tuple ( BrinBuildState * state ) ;
static void union_tuples ( BrinDesc * bdesc , BrinMemTuple * a ,
static void union_tuples ( BrinDesc * bdesc , BrinMemTuple * a ,
BrinTuple * b ) ;
BrinTuple * b ) ;
static void brin_vacuum_scan ( Relation idxrel , BufferAccessStrategy strategy ) ;
static void brin_vacuum_scan ( Relation idxrel , BufferAccessStrategy strategy ) ;
@ -103,6 +228,20 @@ static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
static void brin_fill_empty_ranges ( BrinBuildState * state ,
static void brin_fill_empty_ranges ( BrinBuildState * state ,
BlockNumber prevRange , BlockNumber maxRange ) ;
BlockNumber prevRange , BlockNumber maxRange ) ;
/* parallel index builds */
static void _brin_begin_parallel ( BrinBuildState * buildstate , Relation heap , Relation index ,
bool isconcurrent , int request ) ;
static void _brin_end_parallel ( BrinLeader * btleader , BrinBuildState * state ) ;
static Size _brin_parallel_estimate_shared ( Relation heap , Snapshot snapshot ) ;
static void _brin_leader_participate_as_worker ( BrinBuildState * buildstate ,
Relation heap , Relation index ) ;
static void _brin_parallel_scan_and_build ( BrinBuildState * buildstate ,
BrinSpool * brinspool ,
BrinShared * brinshared ,
Sharedsort * sharedsort ,
Relation heap , Relation index ,
int sortmem , bool progress ) ;
/*
/*
* BRIN handler function : return IndexAmRoutine with access method parameters
* BRIN handler function : return IndexAmRoutine with access method parameters
* and callbacks .
* and callbacks .
@ -127,6 +266,7 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine - > amclusterable = false ;
amroutine - > amclusterable = false ;
amroutine - > ampredlocks = false ;
amroutine - > ampredlocks = false ;
amroutine - > amcanparallel = false ;
amroutine - > amcanparallel = false ;
amroutine - > amcanbuildparallel = true ;
amroutine - > amcaninclude = false ;
amroutine - > amcaninclude = false ;
amroutine - > amusemaintenanceworkmem = false ;
amroutine - > amusemaintenanceworkmem = false ;
amroutine - > amsummarizing = true ;
amroutine - > amsummarizing = true ;
@ -882,6 +1022,65 @@ brinbuildCallback(Relation index,
values , isnull ) ;
values , isnull ) ;
}
}
/*
* Per - heap - tuple callback for table_index_build_scan with parallelism .
*
* A version of the callback used by parallel index builds . The main difference
* is that instead of writing the BRIN tuples into the index , we write them
* into a shared tuplesort , and leave the insertion up to the leader ( which may
* reorder them a bit etc . ) . The callback also does not generate empty ranges ,
* those will be added by the leader when merging results from workers .
*/
static void
brinbuildCallbackParallel ( Relation index ,
ItemPointer tid ,
Datum * values ,
bool * isnull ,
bool tupleIsAlive ,
void * brstate )
{
BrinBuildState * state = ( BrinBuildState * ) brstate ;
BlockNumber thisblock ;
thisblock = ItemPointerGetBlockNumber ( tid ) ;
/*
* If we ' re in a block that belongs to a future range , summarize what
* we ' ve got and start afresh . Note the scan might have skipped many
* pages , if they were devoid of live tuples ; we do not create emptry BRIN
* ranges here - the leader is responsible for filling them in .
*/
if ( thisblock > state - > bs_currRangeStart + state - > bs_pagesPerRange - 1 )
{
BRIN_elog ( ( DEBUG2 ,
" brinbuildCallback: completed a range: %u--%u " ,
state - > bs_currRangeStart ,
state - > bs_currRangeStart + state - > bs_pagesPerRange ) ) ;
/* create the index tuple and write it into the tuplesort */
form_and_spill_tuple ( state ) ;
/*
* Set state to correspond to the next range ( for this block ) .
*
* This skips ranges that are either empty ( and so we don ' t get any
* tuples to summarize ) , or processed by other workers . We can ' t
* differentiate those cases here easily , so we leave it up to the
* leader to fill empty ranges where needed .
*/
state - > bs_currRangeStart
= state - > bs_pagesPerRange * ( thisblock / state - > bs_pagesPerRange ) ;
/* re-initialize state for it */
brin_memtuple_initialize ( state - > bs_dtuple , state - > bs_bdesc ) ;
}
/* Accumulate the current tuple into the running state */
( void ) add_values_to_range ( index , state - > bs_bdesc , state - > bs_dtuple ,
values , isnull ) ;
}
/*
/*
* brinbuild ( ) - - build a new BRIN index .
* brinbuild ( ) - - build a new BRIN index .
*/
*/
@ -944,29 +1143,105 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
state = initialize_brin_buildstate ( index , revmap , pagesPerRange ,
state = initialize_brin_buildstate ( index , revmap , pagesPerRange ,
RelationGetNumberOfBlocks ( heap ) ) ;
RelationGetNumberOfBlocks ( heap ) ) ;
state - > bs_spool = ( BrinSpool * ) palloc0 ( sizeof ( BrinSpool ) ) ;
state - > bs_spool - > heap = heap ;
state - > bs_spool - > index = index ;
/*
* Attempt to launch parallel worker scan when required
*
* XXX plan_create_index_workers makes the number of workers dependent on
* maintenance_work_mem , requiring 32 MB for each worker . That makes sense
* for btree , but not for BRIN , which can do away with much less memory .
* So maybe make that somehow less strict , optionally ?
*/
if ( indexInfo - > ii_ParallelWorkers > 0 )
_brin_begin_parallel ( state , heap , index , indexInfo - > ii_Concurrent ,
indexInfo - > ii_ParallelWorkers ) ;
/*
/*
* Now scan the relation . No syncscan allowed here because we want the
* Now scan the relation . No syncscan allowed here because we want the
* heap blocks in physical order .
* heap blocks in physical order .
*
* If parallel build requested and at least one worker process was
* successfully launched , set up coordination state
*/
if ( state - > bs_leader )
{
SortCoordinate coordinate ;
coordinate = ( SortCoordinate ) palloc0 ( sizeof ( SortCoordinateData ) ) ;
coordinate - > isWorker = false ;
coordinate - > nParticipants =
state - > bs_leader - > nparticipanttuplesorts ;
coordinate - > sharedsort = state - > bs_leader - > sharedsort ;
/*
* Begin serial / leader tuplesort .
*
* In cases where parallelism is involved , the leader receives the
* same share of maintenance_work_mem as a serial sort ( it is
* generally treated in the same way as a serial sort once we return ) .
* Parallel worker Tuplesortstates will have received only a fraction
* of maintenance_work_mem , though .
*
* We rely on the lifetime of the Leader Tuplesortstate almost not
* overlapping with any worker Tuplesortstate ' s lifetime . There may
* be some small overlap , but that ' s okay because we rely on leader
* Tuplesortstate only allocating a small , fixed amount of memory
* here . When its tuplesort_performsort ( ) is called ( by our caller ) ,
* and significant amounts of memory are likely to be used , all
* workers must have already freed almost all memory held by their
* Tuplesortstates ( they are about to go away completely , too ) . The
* overall effect is that maintenance_work_mem always represents an
* absolute high watermark on the amount of memory used by a CREATE
* INDEX operation , regardless of the use of parallelism or any other
* factor .
*/
*/
state - > bs_spool - > sortstate =
tuplesort_begin_index_brin ( heap , index ,
maintenance_work_mem , coordinate ,
TUPLESORT_NONE ) ;
/*
* In parallel mode , wait for workers to complete , and then read all
* tuples from the shared tuplesort and insert them into the index .
*/
_brin_end_parallel ( state - > bs_leader , state ) ;
}
else /* no parallel index build */
{
reltuples = table_index_build_scan ( heap , index , indexInfo , false , true ,
reltuples = table_index_build_scan ( heap , index , indexInfo , false , true ,
brinbuildCallback , ( void * ) state , NULL ) ;
brinbuildCallback , ( void * ) state , NULL ) ;
/* process the final batch */
/*
* process the final batch
*
* XXX Note this does not update state - > bs_currRangeStart , i . e . it
* stays set to the last range added to the index . This is OK , because
* that ' s what brin_fill_empty_ranges expects .
*/
form_and_insert_tuple ( state ) ;
form_and_insert_tuple ( state ) ;
/*
/*
* Backfill the final ranges with empty data .
* Backfill the final ranges with empty data .
*
*
* This saves us from doing what amounts to full table scans when the
* This saves us from doing what amounts to full table scans when the
* index with a predicate like WHERE ( nonnull_column IS NULL ) , or other
* index with a predicate like WHERE ( nonnull_column IS NULL ) , or
* very selective predicates .
* other very selective predicates .
*/
*/
brin_fill_empty_ranges ( state ,
brin_fill_empty_ranges ( state ,
state - > bs_currRangeStart ,
state - > bs_currRangeStart ,
state - > bs_maxRangeStart ) ;
state - > bs_maxRangeStart ) ;
/* track the number of relation tuples */
state - > bs_reltuples = reltuples ;
}
/* release resources */
/* release resources */
idxtuples = state - > bs_numtuples ;
idxtuples = state - > bs_numtuples ;
reltuples = state - > bs_reltuples ;
brinRevmapTerminate ( state - > bs_rmAccess ) ;
brinRevmapTerminate ( state - > bs_rmAccess ) ;
terminate_brin_buildstate ( state ) ;
terminate_brin_buildstate ( state ) ;
@ -1387,12 +1662,19 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
state - > bs_irel = idxRel ;
state - > bs_irel = idxRel ;
state - > bs_numtuples = 0 ;
state - > bs_numtuples = 0 ;
state - > bs_reltuples = 0 ;
state - > bs_currentInsertBuf = InvalidBuffer ;
state - > bs_currentInsertBuf = InvalidBuffer ;
state - > bs_pagesPerRange = pagesPerRange ;
state - > bs_pagesPerRange = pagesPerRange ;
state - > bs_currRangeStart = 0 ;
state - > bs_currRangeStart = 0 ;
state - > bs_rmAccess = revmap ;
state - > bs_rmAccess = revmap ;
state - > bs_bdesc = brin_build_desc ( idxRel ) ;
state - > bs_bdesc = brin_build_desc ( idxRel ) ;
state - > bs_dtuple = brin_new_memtuple ( state - > bs_bdesc ) ;
state - > bs_dtuple = brin_new_memtuple ( state - > bs_bdesc ) ;
state - > bs_leader = NULL ;
state - > bs_worker_id = 0 ;
state - > bs_spool = NULL ;
state - > bs_context = CurrentMemoryContext ;
state - > bs_emptyTuple = NULL ;
state - > bs_emptyTupleLen = 0 ;
/* Remember the memory context to use for an empty tuple, if needed. */
/* Remember the memory context to use for an empty tuple, if needed. */
state - > bs_context = CurrentMemoryContext ;
state - > bs_context = CurrentMemoryContext ;
@ -1701,6 +1983,32 @@ form_and_insert_tuple(BrinBuildState *state)
pfree ( tup ) ;
pfree ( tup ) ;
}
}
/*
* Given a deformed tuple in the build state , convert it into the on - disk
* format and write it to a ( shared ) tuplesort ( the leader will insert it
* into the index later ) .
*/
static void
form_and_spill_tuple ( BrinBuildState * state )
{
BrinTuple * tup ;
Size size ;
/* don't insert empty tuples in parallel build */
if ( state - > bs_dtuple - > bt_empty_range )
return ;
tup = brin_form_tuple ( state - > bs_bdesc , state - > bs_currRangeStart ,
state - > bs_dtuple , & size ) ;
/* write the BRIN tuple to the tuplesort */
tuplesort_putbrintuple ( state - > bs_spool - > sortstate , tup , size ) ;
state - > bs_numtuples + + ;
pfree ( tup ) ;
}
/*
/*
* Given two deformed tuples , adjust the first one so that it ' s consistent
* Given two deformed tuples , adjust the first one so that it ' s consistent
* with the summary values in both .
* with the summary values in both .
@ -2021,6 +2329,554 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
return true ;
return true ;
}
}
static void
_brin_begin_parallel ( BrinBuildState * buildstate , Relation heap , Relation index ,
bool isconcurrent , int request )
{
ParallelContext * pcxt ;
int scantuplesortstates ;
Snapshot snapshot ;
Size estbrinshared ;
Size estsort ;
BrinShared * brinshared ;
Sharedsort * sharedsort ;
BrinLeader * brinleader = ( BrinLeader * ) palloc0 ( sizeof ( BrinLeader ) ) ;
WalUsage * walusage ;
BufferUsage * bufferusage ;
bool leaderparticipates = true ;
int querylen ;
# ifdef DISABLE_LEADER_PARTICIPATION
leaderparticipates = false ;
# endif
/*
* Enter parallel mode , and create context for parallel build of brin
* index
*/
EnterParallelMode ( ) ;
Assert ( request > 0 ) ;
pcxt = CreateParallelContext ( " postgres " , " _brin_parallel_build_main " ,
request ) ;
scantuplesortstates = leaderparticipates ? request + 1 : request ;
/*
* Prepare for scan of the base relation . In a normal index build , we use
* SnapshotAny because we must retrieve all tuples and do our own time
* qual checks ( because we have to index RECENTLY_DEAD tuples ) . In a
* concurrent build , we take a regular MVCC snapshot and index whatever ' s
* live according to that .
*/
if ( ! isconcurrent )
snapshot = SnapshotAny ;
else
snapshot = RegisterSnapshot ( GetTransactionSnapshot ( ) ) ;
/*
* Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace .
*/
estbrinshared = _brin_parallel_estimate_shared ( heap , snapshot ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , estbrinshared ) ;
estsort = tuplesort_estimate_shared ( scantuplesortstates ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , estsort ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 2 ) ;
/*
* Estimate space for WalUsage and BufferUsage - - PARALLEL_KEY_WAL_USAGE
* and PARALLEL_KEY_BUFFER_USAGE .
*
* If there are no extensions loaded that care , we could skip this . We
* have no way of knowing whether anyone ' s looking at pgWalUsage or
* pgBufferUsage , so do it unconditionally .
*/
shm_toc_estimate_chunk ( & pcxt - > estimator ,
mul_size ( sizeof ( WalUsage ) , pcxt - > nworkers ) ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator ,
mul_size ( sizeof ( BufferUsage ) , pcxt - > nworkers ) ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if ( debug_query_string )
{
querylen = strlen ( debug_query_string ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , querylen + 1 ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
}
else
querylen = 0 ; /* keep compiler quiet */
/* Everyone's had a chance to ask for space, so now create the DSM */
InitializeParallelDSM ( pcxt ) ;
/* If no DSM segment was available, back out (do serial build) */
if ( pcxt - > seg = = NULL )
{
if ( IsMVCCSnapshot ( snapshot ) )
UnregisterSnapshot ( snapshot ) ;
DestroyParallelContext ( pcxt ) ;
ExitParallelMode ( ) ;
return ;
}
/* Store shared build state, for which we reserved space */
brinshared = ( BrinShared * ) shm_toc_allocate ( pcxt - > toc , estbrinshared ) ;
/* Initialize immutable state */
brinshared - > heaprelid = RelationGetRelid ( heap ) ;
brinshared - > indexrelid = RelationGetRelid ( index ) ;
brinshared - > isconcurrent = isconcurrent ;
brinshared - > scantuplesortstates = scantuplesortstates ;
brinshared - > pagesPerRange = buildstate - > bs_pagesPerRange ;
ConditionVariableInit ( & brinshared - > workersdonecv ) ;
SpinLockInit ( & brinshared - > mutex ) ;
/* Initialize mutable state */
brinshared - > nparticipantsdone = 0 ;
brinshared - > reltuples = 0.0 ;
brinshared - > indtuples = 0.0 ;
table_parallelscan_initialize ( heap ,
ParallelTableScanFromBrinShared ( brinshared ) ,
snapshot ) ;
/*
* Store shared tuplesort - private state , for which we reserved space .
* Then , initialize opaque state using tuplesort routine .
*/
sharedsort = ( Sharedsort * ) shm_toc_allocate ( pcxt - > toc , estsort ) ;
tuplesort_initialize_shared ( sharedsort , scantuplesortstates ,
pcxt - > seg ) ;
/*
* Store shared tuplesort - private state , for which we reserved space .
* Then , initialize opaque state using tuplesort routine .
*/
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_BRIN_SHARED , brinshared ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_TUPLESORT , sharedsort ) ;
/* Store query string for workers */
if ( debug_query_string )
{
char * sharedquery ;
sharedquery = ( char * ) shm_toc_allocate ( pcxt - > toc , querylen + 1 ) ;
memcpy ( sharedquery , debug_query_string , querylen + 1 ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_QUERY_TEXT , sharedquery ) ;
}
/*
* Allocate space for each worker ' s WalUsage and BufferUsage ; no need to
* initialize .
*/
walusage = shm_toc_allocate ( pcxt - > toc ,
mul_size ( sizeof ( WalUsage ) , pcxt - > nworkers ) ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_WAL_USAGE , walusage ) ;
bufferusage = shm_toc_allocate ( pcxt - > toc ,
mul_size ( sizeof ( BufferUsage ) , pcxt - > nworkers ) ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_BUFFER_USAGE , bufferusage ) ;
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers ( pcxt ) ;
brinleader - > pcxt = pcxt ;
brinleader - > nparticipanttuplesorts = pcxt - > nworkers_launched ;
if ( leaderparticipates )
brinleader - > nparticipanttuplesorts + + ;
brinleader - > brinshared = brinshared ;
brinleader - > sharedsort = sharedsort ;
brinleader - > snapshot = snapshot ;
brinleader - > walusage = walusage ;
brinleader - > bufferusage = bufferusage ;
/* If no workers were successfully launched, back out (do serial build) */
if ( pcxt - > nworkers_launched = = 0 )
{
_brin_end_parallel ( brinleader , NULL ) ;
return ;
}
/* Save leader state now that it's clear build will be parallel */
buildstate - > bs_leader = brinleader ;
/* Join heap scan ourselves */
if ( leaderparticipates )
_brin_leader_participate_as_worker ( buildstate , heap , index ) ;
/*
* Caller needs to wait for all launched workers when we return . Make
* sure that the failure - to - start case will not hang forever .
*/
WaitForParallelWorkersToAttach ( pcxt ) ;
}
/*
* Shut down workers , destroy parallel context , and end parallel mode .
*/
static void
_brin_end_parallel ( BrinLeader * brinleader , BrinBuildState * state )
{
int i ;
BrinTuple * btup ;
BrinMemTuple * memtuple = NULL ;
Size tuplen ;
BrinShared * brinshared = brinleader - > brinshared ;
BlockNumber prevblkno = InvalidBlockNumber ;
BrinSpool * spool ;
MemoryContext rangeCxt ,
oldCxt ;
/* Shutdown worker processes */
WaitForParallelWorkersToFinish ( brinleader - > pcxt ) ;
/*
* If we didn ' t actually launch workers , we still have to make sure to
* exit parallel mode .
*/
if ( ! state )
goto cleanup ;
/* copy the data into leader state (we have to wait for the workers ) */
state - > bs_reltuples = brinshared - > reltuples ;
state - > bs_numtuples = brinshared - > indtuples ;
/* do the actual sort in the leader */
spool = state - > bs_spool ;
tuplesort_performsort ( spool - > sortstate ) ;
/*
* Initialize BrinMemTuple we ' ll use to union summaries from workers ( in
* case they happened to produce parts of the same paga range ) .
*/
memtuple = brin_new_memtuple ( state - > bs_bdesc ) ;
/*
* Create a memory context we ' ll reset to combine results for a single
* page range ( received from the workers ) . We don ' t expect huge number of
* overlaps under regular circumstances , because for large tables the
* chunk size is likely larger than the BRIN page range ) , but it can
* happen , and the union functions may do all kinds of stuff . So we better
* reset the context once in a while .
*/
rangeCxt = AllocSetContextCreate ( CurrentMemoryContext ,
" brin union " ,
ALLOCSET_DEFAULT_SIZES ) ;
oldCxt = MemoryContextSwitchTo ( rangeCxt ) ;
/*
* Read the BRIN tuples from the shared tuplesort , sorted by block number .
* That probably gives us an index that is cheaper to scan , thanks to
* mostly getting data from the same index page as before .
*/
while ( ( btup = tuplesort_getbrintuple ( spool - > sortstate , & tuplen , true ) ) ! = NULL )
{
/* Ranges should be multiples of pages_per_range for the index. */
Assert ( btup - > bt_blkno % brinshared - > pagesPerRange = = 0 ) ;
/*
* Do we need to union summaries for the same page range ?
*
* If this is the first brin tuple we read , then just deform it into
* the memtuple , and continue with the next one from tuplesort . We
* however may need to insert empty summaries into the index .
*
* If it ' s the same block as the last we saw , we simply union the brin
* tuple into it , and we ' re done - we don ' t even need to insert empty
* ranges , because that was done earlier when we saw the first brin
* tuple ( for this range ) .
*
* Finally , if it ' s not the first brin tuple , and it ' s not the same
* page range , we need to do the insert and then deform the tuple into
* the memtuple . Then we ' ll insert empty ranges before the new brin
* tuple , if needed .
*/
if ( prevblkno = = InvalidBlockNumber )
{
/* First brin tuples, just deform into memtuple. */
memtuple = brin_deform_tuple ( state - > bs_bdesc , btup , memtuple ) ;
/* continue to insert empty pages before thisblock */
}
else if ( memtuple - > bt_blkno = = btup - > bt_blkno )
{
/*
* Not the first brin tuple , but same page range as the previous
* one , so we can merge it into the memtuple .
*/
union_tuples ( state - > bs_bdesc , memtuple , btup ) ;
continue ;
}
else
{
BrinTuple * tmp ;
Size len ;
/*
* We got brin tuple for a different page range , so form a brin
* tuple from the memtuple , insert it , and re - init the memtuple
* from the new brin tuple .
*/
tmp = brin_form_tuple ( state - > bs_bdesc , memtuple - > bt_blkno ,
memtuple , & len ) ;
brin_doinsert ( state - > bs_irel , state - > bs_pagesPerRange , state - > bs_rmAccess ,
& state - > bs_currentInsertBuf , tmp - > bt_blkno , tmp , len ) ;
/*
* Reset the per - output - range context . This frees all the memory
* possibly allocated by the union functions , and also the BRIN
* tuple we just formed and inserted .
*/
MemoryContextReset ( rangeCxt ) ;
memtuple = brin_deform_tuple ( state - > bs_bdesc , btup , memtuple ) ;
/* continue to insert empty pages before thisblock */
}
/* Fill empty ranges for all ranges missing in the tuplesort. */
brin_fill_empty_ranges ( state , prevblkno , btup - > bt_blkno ) ;
prevblkno = btup - > bt_blkno ;
}
tuplesort_end ( spool - > sortstate ) ;
/* Fill the BRIN tuple for the last page range with data. */
if ( prevblkno ! = InvalidBlockNumber )
{
BrinTuple * tmp ;
Size len ;
tmp = brin_form_tuple ( state - > bs_bdesc , memtuple - > bt_blkno ,
memtuple , & len ) ;
brin_doinsert ( state - > bs_irel , state - > bs_pagesPerRange , state - > bs_rmAccess ,
& state - > bs_currentInsertBuf , tmp - > bt_blkno , tmp , len ) ;
pfree ( tmp ) ;
}
/* Fill empty ranges at the end, for all ranges missing in the tuplesort. */
brin_fill_empty_ranges ( state , prevblkno , state - > bs_maxRangeStart ) ;
/*
* Switch back to the original memory context , and destroy the one we
* created to isolate the union_tuple calls .
*/
MemoryContextSwitchTo ( oldCxt ) ;
MemoryContextDelete ( rangeCxt ) ;
/*
* Next , accumulate WAL usage . ( This must wait for the workers to finish ,
* or we might get incomplete data . )
*/
for ( i = 0 ; i < brinleader - > pcxt - > nworkers_launched ; i + + )
InstrAccumParallelQuery ( & brinleader - > bufferusage [ i ] , & brinleader - > walusage [ i ] ) ;
cleanup :
/* Free last reference to MVCC snapshot, if one was used */
if ( IsMVCCSnapshot ( brinleader - > snapshot ) )
UnregisterSnapshot ( brinleader - > snapshot ) ;
DestroyParallelContext ( brinleader - > pcxt ) ;
ExitParallelMode ( ) ;
}
/*
* Returns size of shared memory required to store state for a parallel
* brin index build based on the snapshot its parallel scan will use .
*/
static Size
_brin_parallel_estimate_shared ( Relation heap , Snapshot snapshot )
{
/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
return add_size ( BUFFERALIGN ( sizeof ( BrinShared ) ) ,
table_parallelscan_estimate ( heap , snapshot ) ) ;
}
/*
* Within leader , participate as a parallel worker .
*/
static void
_brin_leader_participate_as_worker ( BrinBuildState * buildstate , Relation heap , Relation index )
{
BrinLeader * brinleader = buildstate - > bs_leader ;
int sortmem ;
/* Allocate memory and initialize private spool */
buildstate - > bs_spool = ( BrinSpool * ) palloc0 ( sizeof ( BrinSpool ) ) ;
buildstate - > bs_spool - > heap = buildstate - > bs_spool - > heap ;
buildstate - > bs_spool - > index = buildstate - > bs_spool - > index ;
/*
* Might as well use reliable figure when doling out maintenance_work_mem
* ( when requested number of workers were not launched , this will be
* somewhat higher than it is for other workers ) .
*/
sortmem = maintenance_work_mem / brinleader - > nparticipanttuplesorts ;
/* Perform work common to all participants */
_brin_parallel_scan_and_build ( buildstate , buildstate - > bs_spool , brinleader - > brinshared ,
brinleader - > sharedsort , heap , index , sortmem , true ) ;
}
/*
* Perform a worker ' s portion of a parallel sort .
*
* This generates a tuplesort for passed btspool , and a second tuplesort
* state if a second btspool is need ( i . e . for unique index builds ) . All
* other spool fields should already be set when this is called .
*
* sortmem is the amount of working memory to use within each worker ,
* expressed in KBs .
*
* When this returns , workers are done , and need only release resources .
*/
static void
_brin_parallel_scan_and_build ( BrinBuildState * state , BrinSpool * brinspool ,
BrinShared * brinshared , Sharedsort * sharedsort ,
Relation heap , Relation index , int sortmem ,
bool progress )
{
SortCoordinate coordinate ;
TableScanDesc scan ;
double reltuples ;
IndexInfo * indexInfo ;
/* Initialize local tuplesort coordination state */
coordinate = palloc0 ( sizeof ( SortCoordinateData ) ) ;
coordinate - > isWorker = true ;
coordinate - > nParticipants = - 1 ;
coordinate - > sharedsort = sharedsort ;
/* Begin "partial" tuplesort */
brinspool - > sortstate = tuplesort_begin_index_brin ( brinspool - > heap ,
brinspool - > index ,
sortmem , coordinate ,
TUPLESORT_NONE ) ;
/* Join parallel scan */
indexInfo = BuildIndexInfo ( index ) ;
indexInfo - > ii_Concurrent = brinshared - > isconcurrent ;
scan = table_beginscan_parallel ( heap ,
ParallelTableScanFromBrinShared ( brinshared ) ) ;
reltuples = table_index_build_scan ( heap , index , indexInfo , true , true ,
brinbuildCallbackParallel , state , scan ) ;
/* insert the last item */
form_and_spill_tuple ( state ) ;
/* sort the BRIN ranges built by this worker */
tuplesort_performsort ( brinspool - > sortstate ) ;
state - > bs_reltuples + = reltuples ;
/*
* Done . Record ambuild statistics .
*/
SpinLockAcquire ( & brinshared - > mutex ) ;
brinshared - > nparticipantsdone + + ;
brinshared - > reltuples + = state - > bs_reltuples ;
brinshared - > indtuples + = state - > bs_numtuples ;
SpinLockRelease ( & brinshared - > mutex ) ;
/* Notify leader */
ConditionVariableSignal ( & brinshared - > workersdonecv ) ;
tuplesort_end ( brinspool - > sortstate ) ;
}
/*
* Perform work within a launched parallel process .
*/
void
_brin_parallel_build_main ( dsm_segment * seg , shm_toc * toc )
{
char * sharedquery ;
BrinShared * brinshared ;
Sharedsort * sharedsort ;
BrinBuildState * buildstate ;
Relation heapRel ;
Relation indexRel ;
LOCKMODE heapLockmode ;
LOCKMODE indexLockmode ;
WalUsage * walusage ;
BufferUsage * bufferusage ;
int sortmem ;
/*
* The only possible status flag that can be set to the parallel worker is
* PROC_IN_SAFE_IC .
*/
Assert ( ( MyProc - > statusFlags = = 0 ) | |
( MyProc - > statusFlags = = PROC_IN_SAFE_IC ) ) ;
/* Set debug_query_string for individual workers first */
sharedquery = shm_toc_lookup ( toc , PARALLEL_KEY_QUERY_TEXT , true ) ;
debug_query_string = sharedquery ;
/* Report the query string from leader */
pgstat_report_activity ( STATE_RUNNING , debug_query_string ) ;
/* Look up brin shared state */
brinshared = shm_toc_lookup ( toc , PARALLEL_KEY_BRIN_SHARED , false ) ;
/* Open relations using lock modes known to be obtained by index.c */
if ( ! brinshared - > isconcurrent )
{
heapLockmode = ShareLock ;
indexLockmode = AccessExclusiveLock ;
}
else
{
heapLockmode = ShareUpdateExclusiveLock ;
indexLockmode = RowExclusiveLock ;
}
/* Open relations within worker */
heapRel = table_open ( brinshared - > heaprelid , heapLockmode ) ;
indexRel = index_open ( brinshared - > indexrelid , indexLockmode ) ;
buildstate = initialize_brin_buildstate ( indexRel , NULL ,
brinshared - > pagesPerRange ,
InvalidBlockNumber ) ;
/* Initialize worker's own spool */
buildstate - > bs_spool = ( BrinSpool * ) palloc0 ( sizeof ( BrinSpool ) ) ;
buildstate - > bs_spool - > heap = heapRel ;
buildstate - > bs_spool - > index = indexRel ;
/* Look up shared state private to tuplesort.c */
sharedsort = shm_toc_lookup ( toc , PARALLEL_KEY_TUPLESORT , false ) ;
tuplesort_attach_shared ( sharedsort , seg ) ;
/* Prepare to track buffer usage during parallel execution */
InstrStartParallelQuery ( ) ;
/*
* Might as well use reliable figure when doling out maintenance_work_mem
* ( when requested number of workers were not launched , this will be
* somewhat higher than it is for other workers ) .
*/
sortmem = maintenance_work_mem / brinshared - > scantuplesortstates ;
_brin_parallel_scan_and_build ( buildstate , buildstate - > bs_spool ,
brinshared , sharedsort ,
heapRel , indexRel , sortmem , false ) ;
/* Report WAL/buffer usage during parallel execution */
bufferusage = shm_toc_lookup ( toc , PARALLEL_KEY_BUFFER_USAGE , false ) ;
walusage = shm_toc_lookup ( toc , PARALLEL_KEY_WAL_USAGE , false ) ;
InstrEndParallelQuery ( & bufferusage [ ParallelWorkerNumber ] ,
& walusage [ ParallelWorkerNumber ] ) ;
index_close ( indexRel , indexLockmode ) ;
table_close ( heapRel , heapLockmode ) ;
}
/*
/*
* brin_build_empty_tuple
* brin_build_empty_tuple
* Maybe initialize a BRIN tuple representing empty range .
* Maybe initialize a BRIN tuple representing empty range .