@ -42,6 +42,7 @@
# include "nodes/makefuncs.h"
# include "nodes/makefuncs.h"
# include "pgstat.h"
# include "pgstat.h"
# include "postmaster/autovacuum.h"
# include "postmaster/autovacuum.h"
# include "postmaster/bgworker_internals.h"
# include "storage/bufmgr.h"
# include "storage/bufmgr.h"
# include "storage/lmgr.h"
# include "storage/lmgr.h"
# include "storage/proc.h"
# include "storage/proc.h"
@ -68,6 +69,14 @@ static MemoryContext vac_context = NULL;
static BufferAccessStrategy vac_strategy ;
static BufferAccessStrategy vac_strategy ;
/*
* Variables for cost - based parallel vacuum . See comments atop
* compute_parallel_delay to understand how it works .
*/
pg_atomic_uint32 * VacuumSharedCostBalance = NULL ;
pg_atomic_uint32 * VacuumActiveNWorkers = NULL ;
int VacuumCostBalanceLocal = 0 ;
/* non-export function prototypes */
/* non-export function prototypes */
static List * expand_vacuum_rel ( VacuumRelation * vrel , int options ) ;
static List * expand_vacuum_rel ( VacuumRelation * vrel , int options ) ;
static List * get_all_vacuum_rels ( int options ) ;
static List * get_all_vacuum_rels ( int options ) ;
@ -76,6 +85,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
TransactionId lastSaneFrozenXid ,
TransactionId lastSaneFrozenXid ,
MultiXactId lastSaneMinMulti ) ;
MultiXactId lastSaneMinMulti ) ;
static bool vacuum_rel ( Oid relid , RangeVar * relation , VacuumParams * params ) ;
static bool vacuum_rel ( Oid relid , RangeVar * relation , VacuumParams * params ) ;
static double compute_parallel_delay ( void ) ;
static VacOptTernaryValue get_vacopt_ternary_value ( DefElem * def ) ;
static VacOptTernaryValue get_vacopt_ternary_value ( DefElem * def ) ;
/*
/*
@ -94,12 +104,16 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
bool freeze = false ;
bool freeze = false ;
bool full = false ;
bool full = false ;
bool disable_page_skipping = false ;
bool disable_page_skipping = false ;
bool parallel_option = false ;
ListCell * lc ;
ListCell * lc ;
/* Set default value */
/* Set default value */
params . index_cleanup = VACOPT_TERNARY_DEFAULT ;
params . index_cleanup = VACOPT_TERNARY_DEFAULT ;
params . truncate = VACOPT_TERNARY_DEFAULT ;
params . truncate = VACOPT_TERNARY_DEFAULT ;
/* By default parallel vacuum is enabled */
params . nworkers = 0 ;
/* Parse options list */
/* Parse options list */
foreach ( lc , vacstmt - > options )
foreach ( lc , vacstmt - > options )
{
{
@ -129,6 +143,39 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
params . index_cleanup = get_vacopt_ternary_value ( opt ) ;
params . index_cleanup = get_vacopt_ternary_value ( opt ) ;
else if ( strcmp ( opt - > defname , " truncate " ) = = 0 )
else if ( strcmp ( opt - > defname , " truncate " ) = = 0 )
params . truncate = get_vacopt_ternary_value ( opt ) ;
params . truncate = get_vacopt_ternary_value ( opt ) ;
else if ( strcmp ( opt - > defname , " parallel " ) = = 0 )
{
parallel_option = true ;
if ( opt - > arg = = NULL )
{
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " parallel option requires a value between 0 and %d " ,
MAX_PARALLEL_WORKER_LIMIT ) ,
parser_errposition ( pstate , opt - > location ) ) ) ;
}
else
{
int nworkers ;
nworkers = defGetInt32 ( opt ) ;
if ( nworkers < 0 | | nworkers > MAX_PARALLEL_WORKER_LIMIT )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " parallel vacuum degree must be between 0 and %d " ,
MAX_PARALLEL_WORKER_LIMIT ) ,
parser_errposition ( pstate , opt - > location ) ) ) ;
/*
* Disable parallel vacuum , if user has specified parallel
* degree as zero .
*/
if ( nworkers = = 0 )
params . nworkers = - 1 ;
else
params . nworkers = nworkers ;
}
}
else
else
ereport ( ERROR ,
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
@ -152,6 +199,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
! ( params . options & ( VACOPT_FULL | VACOPT_FREEZE ) ) ) ;
! ( params . options & ( VACOPT_FULL | VACOPT_FREEZE ) ) ) ;
Assert ( ! ( params . options & VACOPT_SKIPTOAST ) ) ;
Assert ( ! ( params . options & VACOPT_SKIPTOAST ) ) ;
if ( ( params . options & VACOPT_FULL ) & & parallel_option )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot specify both FULL and PARALLEL options " ) ) ) ;
/*
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present .
* Make sure VACOPT_ANALYZE is specified if any column lists are present .
*/
*/
@ -383,6 +435,9 @@ vacuum(List *relations, VacuumParams *params,
VacuumPageHit = 0 ;
VacuumPageHit = 0 ;
VacuumPageMiss = 0 ;
VacuumPageMiss = 0 ;
VacuumPageDirty = 0 ;
VacuumPageDirty = 0 ;
VacuumCostBalanceLocal = 0 ;
VacuumSharedCostBalance = NULL ;
VacuumActiveNWorkers = NULL ;
/*
/*
* Loop to process each selected relation .
* Loop to process each selected relation .
@ -1941,16 +1996,26 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
void
void
vacuum_delay_point ( void )
vacuum_delay_point ( void )
{
{
double msec = 0 ;
/* Always check for interrupts */
/* Always check for interrupts */
CHECK_FOR_INTERRUPTS ( ) ;
CHECK_FOR_INTERRUPTS ( ) ;
/* Nap if appropriate */
if ( ! VacuumCostActive | | InterruptPending )
if ( VacuumCostActive & & ! InterruptPending & &
return ;
VacuumCostBalance > = VacuumCostLimit )
{
double msec ;
/*
* For parallel vacuum , the delay is computed based on the shared cost
* balance . See compute_parallel_delay .
*/
if ( VacuumSharedCostBalance ! = NULL )
msec = compute_parallel_delay ( ) ;
else if ( VacuumCostBalance > = VacuumCostLimit )
msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit ;
msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit ;
/* Nap if appropriate */
if ( msec > 0 )
{
if ( msec > VacuumCostDelay * 4 )
if ( msec > VacuumCostDelay * 4 )
msec = VacuumCostDelay * 4 ;
msec = VacuumCostDelay * 4 ;
@ -1966,6 +2031,66 @@ vacuum_delay_point(void)
}
}
}
}
/*
* Computes the vacuum delay for parallel workers .
*
* The basic idea of a cost - based vacuum delay for parallel vacuum is to allow
* each worker to sleep proportional to the work done by it . We achieve this
* by allowing all parallel vacuum workers including the leader process to
* have a shared view of cost related parameters ( mainly VacuumCostBalance ) .
* We allow each worker to update it as and when it has incurred any cost and
* then based on that decide whether it needs to sleep . We compute the time
* to sleep for a worker based on the cost it has incurred
* ( VacuumCostBalanceLocal ) and then reduce the VacuumSharedCostBalance by
* that amount . This avoids letting the workers sleep who have done less or
* no I / O as compared to other workers and therefore can ensure that workers
* who are doing more I / O got throttled more .
*
* We allow any worker to sleep only if it has performed the I / O above a
* certain threshold , which is calculated based on the number of active
* workers ( VacuumActiveNWorkers ) , and the overall cost balance is more than
* VacuumCostLimit set by the system . The testing reveals that we achieve
* the required throttling if we allow a worker that has done more than 50 %
* of its share of work to sleep .
*/
static double
compute_parallel_delay ( void )
{
double msec = 0 ;
uint32 shared_balance ;
int nworkers ;
/* Parallel vacuum must be active */
Assert ( VacuumSharedCostBalance ) ;
nworkers = pg_atomic_read_u32 ( VacuumActiveNWorkers ) ;
/* At least count itself */
Assert ( nworkers > = 1 ) ;
/* Update the shared cost balance value atomically */
shared_balance = pg_atomic_add_fetch_u32 ( VacuumSharedCostBalance , VacuumCostBalance ) ;
/* Compute the total local balance for the current worker */
VacuumCostBalanceLocal + = VacuumCostBalance ;
if ( ( shared_balance > = VacuumCostLimit ) & &
( VacuumCostBalanceLocal > 0.5 * ( VacuumCostLimit / nworkers ) ) )
{
/* Compute sleep time based on the local cost balance */
msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit ;
pg_atomic_sub_fetch_u32 ( VacuumSharedCostBalance , VacuumCostBalanceLocal ) ;
VacuumCostBalanceLocal = 0 ;
}
/*
* Reset the local balance as we accumulated it into the shared value .
*/
VacuumCostBalance = 0 ;
return msec ;
}
/*
/*
* A wrapper function of defGetBoolean ( ) .
* A wrapper function of defGetBoolean ( ) .
*
*