@ -130,6 +130,13 @@ typedef struct
int num_requests ; /* current # of requests */
int num_requests ; /* current # of requests */
int max_requests ; /* allocated array size */
int max_requests ; /* allocated array size */
int head ; /* Index of the first request in the ring
* buffer */
int tail ; /* Index of the last request in the ring
* buffer */
/* The ring buffer of pending checkpointer requests */
CheckpointerRequest requests [ FLEXIBLE_ARRAY_MEMBER ] ;
CheckpointerRequest requests [ FLEXIBLE_ARRAY_MEMBER ] ;
} CheckpointerShmemStruct ;
} CheckpointerShmemStruct ;
@ -138,6 +145,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
# define WRITES_PER_ABSORB 1000
# define WRITES_PER_ABSORB 1000
/* Maximum number of checkpointer requests to process in one batch */
# define CKPT_REQ_BATCH_SIZE 10000
/* Max number of requests the checkpointer request queue can hold */
# define MAX_CHECKPOINT_REQUESTS 10000000
/*
/*
* GUC parameters
* GUC parameters
*/
*/
@ -973,7 +986,8 @@ CheckpointerShmemInit(void)
*/
*/
MemSet ( CheckpointerShmem , 0 , size ) ;
MemSet ( CheckpointerShmem , 0 , size ) ;
SpinLockInit ( & CheckpointerShmem - > ckpt_lck ) ;
SpinLockInit ( & CheckpointerShmem - > ckpt_lck ) ;
CheckpointerShmem - > max_requests = NBuffers ;
CheckpointerShmem - > max_requests = Min ( NBuffers , MAX_CHECKPOINT_REQUESTS ) ;
CheckpointerShmem - > head = CheckpointerShmem - > tail = 0 ;
ConditionVariableInit ( & CheckpointerShmem - > start_cv ) ;
ConditionVariableInit ( & CheckpointerShmem - > start_cv ) ;
ConditionVariableInit ( & CheckpointerShmem - > done_cv ) ;
ConditionVariableInit ( & CheckpointerShmem - > done_cv ) ;
}
}
@ -1201,6 +1215,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
{
{
CheckpointerRequest * request ;
CheckpointerRequest * request ;
bool too_full ;
bool too_full ;
int insert_pos ;
if ( ! IsUnderPostmaster )
if ( ! IsUnderPostmaster )
return false ; /* probably shouldn't even get here */
return false ; /* probably shouldn't even get here */
@ -1224,10 +1239,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
}
}
/* OK, insert request */
/* OK, insert request */
request = & CheckpointerShmem - > requests [ CheckpointerShmem - > num_requests + + ] ;
insert_pos = CheckpointerShmem - > tail ;
request = & CheckpointerShmem - > requests [ insert_pos ] ;
request - > ftag = * ftag ;
request - > ftag = * ftag ;
request - > type = type ;
request - > type = type ;
CheckpointerShmem - > tail = ( CheckpointerShmem - > tail + 1 ) % CheckpointerShmem - > max_requests ;
CheckpointerShmem - > num_requests + + ;
/* If queue is more than half full, nudge the checkpointer to empty it */
/* If queue is more than half full, nudge the checkpointer to empty it */
too_full = ( CheckpointerShmem - > num_requests > =
too_full = ( CheckpointerShmem - > num_requests > =
CheckpointerShmem - > max_requests / 2 ) ;
CheckpointerShmem - > max_requests / 2 ) ;
@ -1269,12 +1288,16 @@ CompactCheckpointerRequestQueue(void)
struct CheckpointerSlotMapping
struct CheckpointerSlotMapping
{
{
CheckpointerRequest request ;
CheckpointerRequest request ;
int slot ;
int ring_idx ;
} ;
} ;
int n ,
int n ;
preserve_count ;
int num_skipped = 0 ;
int num_skipped = 0 ;
int head ;
int max_requests ;
int num_requests ;
int read_idx ,
write_idx ;
HASHCTL ctl ;
HASHCTL ctl ;
HTAB * htab ;
HTAB * htab ;
bool * skip_slot ;
bool * skip_slot ;
@ -1286,8 +1309,13 @@ CompactCheckpointerRequestQueue(void)
if ( CritSectionCount > 0 )
if ( CritSectionCount > 0 )
return false ;
return false ;
max_requests = CheckpointerShmem - > max_requests ;
num_requests = CheckpointerShmem - > num_requests ;
/* Initialize skip_slot array */
/* Initialize skip_slot array */
skip_slot = palloc0 ( sizeof ( bool ) * CheckpointerShmem - > num_requests ) ;
skip_slot = palloc0 ( sizeof ( bool ) * max_requests ) ;
head = CheckpointerShmem - > head ;
/* Initialize temporary hash table */
/* Initialize temporary hash table */
ctl . keysize = sizeof ( CheckpointerRequest ) ;
ctl . keysize = sizeof ( CheckpointerRequest ) ;
@ -1311,7 +1339,8 @@ CompactCheckpointerRequestQueue(void)
* away preceding entries that would end up being canceled anyhow ) , but
* away preceding entries that would end up being canceled anyhow ) , but
* it ' s not clear that the extra complexity would buy us anything .
* it ' s not clear that the extra complexity would buy us anything .
*/
*/
for ( n = 0 ; n < CheckpointerShmem - > num_requests ; n + + )
read_idx = head ;
for ( n = 0 ; n < num_requests ; n + + )
{
{
CheckpointerRequest * request ;
CheckpointerRequest * request ;
struct CheckpointerSlotMapping * slotmap ;
struct CheckpointerSlotMapping * slotmap ;
@ -1324,16 +1353,19 @@ CompactCheckpointerRequestQueue(void)
* CheckpointerShmemInit . Note also that RelFileLocator had better
* CheckpointerShmemInit . Note also that RelFileLocator had better
* contain no pad bytes .
* contain no pad bytes .
*/
*/
request = & CheckpointerShmem - > requests [ n ] ;
request = & CheckpointerShmem - > requests [ read_idx ] ;
slotmap = hash_search ( htab , request , HASH_ENTER , & found ) ;
slotmap = hash_search ( htab , request , HASH_ENTER , & found ) ;
if ( found )
if ( found )
{
{
/* Duplicate, so mark the previous occurrence as skippable */
/* Duplicate, so mark the previous occurrence as skippable */
skip_slot [ slotmap - > slot ] = true ;
skip_slot [ slotmap - > ring_idx ] = true ;
num_skipped + + ;
num_skipped + + ;
}
}
/* Remember slot containing latest occurrence of this request value */
/* Remember slot containing latest occurrence of this request value */
slotmap - > slot = n ;
slotmap - > ring_idx = read_idx ;
/* Move to the next request in the ring buffer */
read_idx = ( read_idx + 1 ) % max_requests ;
}
}
/* Done with the hash table. */
/* Done with the hash table. */
@ -1347,17 +1379,34 @@ CompactCheckpointerRequestQueue(void)
}
}
/* We found some duplicates; remove them. */
/* We found some duplicates; remove them. */
preserve_count = 0 ;
read_idx = write_idx = head ;
for ( n = 0 ; n < CheckpointerShmem - > num_requests ; n + + )
for ( n = 0 ; n < num_requests ; n + + )
{
{
if ( skip_slot [ n ] )
/* If this slot is NOT skipped, keep it */
continue ;
if ( ! skip_slot [ read_idx ] )
CheckpointerShmem - > requests [ preserve_count + + ] = CheckpointerShmem - > requests [ n ] ;
{
/* If the read and write positions are different, copy the request */
if ( write_idx ! = read_idx )
CheckpointerShmem - > requests [ write_idx ] =
CheckpointerShmem - > requests [ read_idx ] ;
/* Advance the write position */
write_idx = ( write_idx + 1 ) % max_requests ;
}
read_idx = ( read_idx + 1 ) % max_requests ;
}
}
/*
* Update ring buffer state : head remains the same , tail moves , count
* decreases
*/
CheckpointerShmem - > tail = write_idx ;
CheckpointerShmem - > num_requests - = num_skipped ;
ereport ( DEBUG1 ,
ereport ( DEBUG1 ,
( errmsg_internal ( " compacted fsync request queue from %d entries to %d entries " ,
( errmsg_internal ( " compacted fsync request queue from %d entries to %d entries " ,
CheckpointerShmem - > num_requests , preserve_count ) ) ) ;
num_requests , CheckpointerShmem - > num_requests ) ) ) ;
CheckpointerShmem - > num_requests = preserve_count ;
/* Cleanup. */
/* Cleanup. */
pfree ( skip_slot ) ;
pfree ( skip_slot ) ;
@ -1378,40 +1427,64 @@ AbsorbSyncRequests(void)
{
{
CheckpointerRequest * requests = NULL ;
CheckpointerRequest * requests = NULL ;
CheckpointerRequest * request ;
CheckpointerRequest * request ;
int n ;
int n ,
i ;
bool loop ;
if ( ! AmCheckpointerProcess ( ) )
if ( ! AmCheckpointerProcess ( ) )
return ;
return ;
LWLockAcquire ( CheckpointerCommLock , LW_EXCLUSIVE ) ;
do
/*
* We try to avoid holding the lock for a long time by copying the request
* array , and processing the requests after releasing the lock .
*
* Once we have cleared the requests from shared memory , we have to PANIC
* if we then fail to absorb them ( eg , because our hashtable runs out of
* memory ) . This is because the system cannot run safely if we are unable
* to fsync what we have been told to fsync . Fortunately , the hashtable
* is so small that the problem is quite unlikely to arise in practice .
*/
n = CheckpointerShmem - > num_requests ;
if ( n > 0 )
{
{
requests = ( CheckpointerRequest * ) palloc ( n * sizeof ( CheckpointerRequest ) ) ;
LWLockAcquire ( CheckpointerCommLock , LW_EXCLUSIVE ) ;
memcpy ( requests , CheckpointerShmem - > requests , n * sizeof ( CheckpointerRequest ) ) ;
}
/*---
* We try to avoid holding the lock for a long time by :
* 1. Copying the request array and processing the requests after
* releasing the lock ;
* 2. Processing not the whole queue , but only batches of
* CKPT_REQ_BATCH_SIZE at once .
*
* Once we have cleared the requests from shared memory , we must
* PANIC if we then fail to absorb them ( e . g . , because our hashtable
* runs out of memory ) . This is because the system cannot run safely
* if we are unable to fsync what we have been told to fsync .
* Fortunately , the hashtable is so small that the problem is quite
* unlikely to arise in practice .
*
* Note : The maximum possible size of a ring buffer is
* MAX_CHECKPOINT_REQUESTS entries , which fit into a maximum palloc
* allocation size of 1 Gb . Our maximum batch size ,
* CKPT_REQ_BATCH_SIZE , is even smaller .
*/
n = Min ( CheckpointerShmem - > num_requests , CKPT_REQ_BATCH_SIZE ) ;
if ( n > 0 )
{
if ( ! requests )
requests = ( CheckpointerRequest * ) palloc ( n * sizeof ( CheckpointerRequest ) ) ;
START_CRIT_SECTION ( ) ;
for ( i = 0 ; i < n ; i + + )
{
requests [ i ] = CheckpointerShmem - > requests [ CheckpointerShmem - > head ] ;
CheckpointerShmem - > head = ( CheckpointerShmem - > head + 1 ) % CheckpointerShmem - > max_requests ;
}
CheckpointerShmem - > num_requests = 0 ;
CheckpointerShmem - > num_requests - = n ;
LWLockRelease ( CheckpointerCommLock ) ;
}
START_CRIT_SECTION ( ) ;
/* Are there any requests in the queue? If so, keep going. */
loop = CheckpointerShmem - > num_requests ! = 0 ;
LWLockRelease ( CheckpointerCommLock ) ;
for ( request = requests ; n > 0 ; request + + , n - - )
for ( request = requests ; n > 0 ; request + + , n - - )
RememberSyncRequest ( & request - > ftag , request - > type ) ;
RememberSyncRequest ( & request - > ftag , request - > type ) ;
END_CRIT_SECTION ( ) ;
END_CRIT_SECTION ( ) ;
} while ( loop ) ;
if ( requests )
if ( requests )
pfree ( requests ) ;
pfree ( requests ) ;