@ -1,28 +1,38 @@
/*-------------------------------------------------------------------------
*
* slru . c
* Simple LRU buffering for transaction status logfiles
* Simple LRU buffering for wrap - around - able permanent metadata
*
* We use a simple least - recently - used scheme to manage a pool of page
* buffers . Under ordinary circumstances we expect that write
* traffic will occur mostly to the latest page ( and to the just - prior
* page , soon after a page transition ) . Read traffic will probably touch
* a larger span of pages , but in any case a fairly small number of page
* buffers should be sufficient . So , we just search the buffers using plain
* linear search ; there ' s no need for a hashtable or anything fancy .
* The management algorithm is straight LRU except that we will never swap
* out the latest page ( since we know it ' s going to be hit again eventually ) .
* This module is used to maintain various pieces of transaction status
* indexed by TransactionId ( such as commit status , parent transaction ID ,
* commit timestamp ) , as well as storage for multixacts , serializable
* isolation locks and NOTIFY traffic . Extensions can define their own
* SLRUs , too .
*
* We use a control LWLock to protect the shared data structures , plus
* per - buffer LWLocks that synchronize I / O for each buffer . The control lock
* must be held to examine or modify any shared state . A process that is
* reading in or writing out a page buffer does not hold the control lock ,
* only the per - buffer lock for the buffer it is working on . One exception
* is latest_page_number , which is read and written using atomic ops .
* Under ordinary circumstances we expect that write traffic will occur
* mostly to the latest page ( and to the just - prior page , soon after a
* page transition ) . Read traffic will probably touch a larger span of
* pages , but a relatively small number of buffers should be sufficient .
*
* " Holding the control lock " means exclusive lock in all cases except for
* SimpleLruReadPage_ReadOnly ( ) ; see comments for SlruRecentlyUsed ( ) for
* the implications of that .
* We use a simple least - recently - used scheme to manage a pool of shared
* page buffers , split in banks by the lowest bits of the page number , and
* the management algorithm only processes the bank to which the desired
* page belongs , so a linear search is sufficient ; there ' s no need for a
* hashtable or anything fancy . The algorithm is straight LRU except that
* we will never swap out the latest page ( since we know it ' s going to be
* hit again eventually ) .
*
* We use per - bank control LWLocks to protect the shared data structures ,
* plus per - buffer LWLocks that synchronize I / O for each buffer . The
* bank ' s control lock must be held to examine or modify any of the bank ' s
* shared state . A process that is reading in or writing out a page
* buffer does not hold the control lock , only the per - buffer lock for the
* buffer it is working on . One exception is latest_page_number , which is
* read and written using atomic ops .
*
* " Holding the bank control lock " means exclusive lock in all cases
* except for SimpleLruReadPage_ReadOnly ( ) ; see comments for
* SlruRecentlyUsed ( ) for the implications of that .
*
* When initiating I / O on a buffer , we acquire the per - buffer lock exclusively
* before releasing the control lock . The per - buffer lock is released after
@ -60,6 +70,7 @@
# include "pgstat.h"
# include "storage/fd.h"
# include "storage/shmem.h"
# include "utils/guc_hooks.h"
static inline int
SlruFileName ( SlruCtl ctl , char * path , int64 segno )
@ -106,6 +117,23 @@ typedef struct SlruWriteAllData
typedef struct SlruWriteAllData * SlruWriteAll ;
/*
* Bank size for the slot array . Pages are assigned a bank according to their
* page number , with each bank being this size . We want a power of 2 so that
* we can determine the bank number for a page with just bit shifting ; we also
* want to keep the bank size small so that LRU victim search is fast . 16
* buffers per bank seems a good number .
*/
# define SLRU_BANK_BITSHIFT 4
# define SLRU_BANK_SIZE (1 << SLRU_BANK_BITSHIFT)
/*
* Macro to get the bank number to which the slot belongs .
*/
# define SlotGetBankNumber(slotno) ((slotno) >> SLRU_BANK_BITSHIFT)
/*
* Populate a file tag describing a segment file . We only use the segment
* number , since we can derive everything else we need by having separate
@ -118,34 +146,6 @@ typedef struct SlruWriteAllData *SlruWriteAll;
( a ) . segno = ( xx_segno ) \
)
/*
* Macro to mark a buffer slot " most recently used " . Note multiple evaluation
* of arguments !
*
* The reason for the if - test is that there are often many consecutive
* accesses to the same page ( particularly the latest page ) . By suppressing
* useless increments of cur_lru_count , we reduce the probability that old
* pages ' counts will " wrap around " and make them appear recently used .
*
* We allow this code to be executed concurrently by multiple processes within
* SimpleLruReadPage_ReadOnly ( ) . As long as int reads and writes are atomic ,
* this should not cause any completely - bogus values to enter the computation .
* However , it is possible for either cur_lru_count or individual
* page_lru_count entries to be " reset " to lower values than they should have ,
* in case a process is delayed while it executes this macro . With care in
* SlruSelectLRUPage ( ) , this does little harm , and in any case the absolute
* worst possible consequence is a nonoptimal choice of page to evict . The
* gain from allowing concurrent reads of SLRU pages seems worth it .
*/
# define SlruRecentlyUsed(shared, slotno) \
do { \
int new_lru_count = ( shared ) - > cur_lru_count ; \
if ( new_lru_count ! = ( shared ) - > page_lru_count [ slotno ] ) { \
( shared ) - > cur_lru_count = + + new_lru_count ; \
( shared ) - > page_lru_count [ slotno ] = new_lru_count ; \
} \
} while ( 0 )
/* Saved info for SlruReportIOError */
typedef enum
{
@ -173,6 +173,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff ( SlruCtl ctl , char * filename ,
int64 segpage , void * data ) ;
static void SlruInternalDeleteSegment ( SlruCtl ctl , int64 segno ) ;
static inline void SlruRecentlyUsed ( SlruShared shared , int slotno ) ;
/*
@ -182,8 +183,12 @@ static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
Size
SimpleLruShmemSize ( int nslots , int nlsns )
{
int nbanks = nslots / SLRU_BANK_SIZE ;
Size sz ;
Assert ( nslots < = SLRU_MAX_ALLOWED_BUFFERS ) ;
Assert ( nslots % SLRU_BANK_SIZE = = 0 ) ;
/* we assume nslots isn't so large as to risk overflow */
sz = MAXALIGN ( sizeof ( SlruSharedData ) ) ;
sz + = MAXALIGN ( nslots * sizeof ( char * ) ) ; /* page_buffer[] */
@ -192,6 +197,8 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz + = MAXALIGN ( nslots * sizeof ( int64 ) ) ; /* page_number[] */
sz + = MAXALIGN ( nslots * sizeof ( int ) ) ; /* page_lru_count[] */
sz + = MAXALIGN ( nslots * sizeof ( LWLockPadded ) ) ; /* buffer_locks[] */
sz + = MAXALIGN ( nbanks * sizeof ( LWLockPadded ) ) ; /* bank_locks[] */
sz + = MAXALIGN ( nbanks * sizeof ( int ) ) ; /* bank_cur_lru_count[] */
if ( nlsns > 0 )
sz + = MAXALIGN ( nslots * nlsns * sizeof ( XLogRecPtr ) ) ; /* group_lsn[] */
@ -199,6 +206,21 @@ SimpleLruShmemSize(int nslots, int nlsns)
return BUFFERALIGN ( sz ) + BLCKSZ * nslots ;
}
/*
* Determine a number of SLRU buffers to use .
*
* We simply divide shared_buffers by the divisor given and cap
* that at the maximum given ; but always at least SLRU_BANK_SIZE .
* Round down to the nearest multiple of SLRU_BANK_SIZE .
*/
int
SimpleLruAutotuneBuffers ( int divisor , int max )
{
return Min ( max - ( max % SLRU_BANK_SIZE ) ,
Max ( SLRU_BANK_SIZE ,
NBuffers / divisor - ( NBuffers / divisor ) % SLRU_BANK_SIZE ) ) ;
}
/*
* Initialize , or attach to , a simple LRU cache in shared memory .
*
@ -208,16 +230,20 @@ SimpleLruShmemSize(int nslots, int nlsns)
* nlsns : number of LSN groups per page ( set to zero if not relevant ) .
* ctllock : LWLock to use to control access to the shared control structure .
* subdir : PGDATA - relative subdirectory that will contain the files .
* tranche_id : LWLock tranche ID to use for the SLRU ' s per - buffer LWLocks .
* buffer_tranche_id : tranche ID to use for the SLRU ' s per - buffer LWLocks .
* bank_tranche_id : tranche ID to use for the bank LWLocks .
* sync_handler : which set of functions to use to handle sync requests
*/
void
SimpleLruInit ( SlruCtl ctl , const char * name , int nslots , int nlsns ,
LWLock * ctllock , const char * subdir , int tranche_id ,
const char * subdir , int buffer_tranche_id , int bank_ tranche_id,
SyncRequestHandler sync_handler , bool long_segment_names )
{
SlruShared shared ;
bool found ;
int nbanks = nslots / SLRU_BANK_SIZE ;
Assert ( nslots < = SLRU_MAX_ALLOWED_BUFFERS ) ;
shared = ( SlruShared ) ShmemInitStruct ( name ,
SimpleLruShmemSize ( nslots , nlsns ) ,
@ -233,12 +259,9 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
memset ( shared , 0 , sizeof ( SlruSharedData ) ) ;
shared - > ControlLock = ctllock ;
shared - > num_slots = nslots ;
shared - > lsn_groups_per_page = nlsns ;
shared - > cur_lru_count = 0 ;
pg_atomic_init_u64 ( & shared - > latest_page_number , 0 ) ;
shared - > slru_stats_idx = pgstat_get_slru_index ( name ) ;
@ -259,6 +282,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
/* Initialize LWLocks */
shared - > buffer_locks = ( LWLockPadded * ) ( ptr + offset ) ;
offset + = MAXALIGN ( nslots * sizeof ( LWLockPadded ) ) ;
shared - > bank_locks = ( LWLockPadded * ) ( ptr + offset ) ;
offset + = MAXALIGN ( nbanks * sizeof ( LWLockPadded ) ) ;
shared - > bank_cur_lru_count = ( int * ) ( ptr + offset ) ;
offset + = MAXALIGN ( nbanks * sizeof ( int ) ) ;
if ( nlsns > 0 )
{
@ -270,7 +297,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
for ( int slotno = 0 ; slotno < nslots ; slotno + + )
{
LWLockInitialize ( & shared - > buffer_locks [ slotno ] . lock ,
tranche_id ) ;
buffer_ tranche_id) ;
shared - > page_buffer [ slotno ] = ptr ;
shared - > page_status [ slotno ] = SLRU_PAGE_EMPTY ;
@ -279,11 +306,21 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ptr + = BLCKSZ ;
}
/* Initialize the slot banks. */
for ( int bankno = 0 ; bankno < nbanks ; bankno + + )
{
LWLockInitialize ( & shared - > bank_locks [ bankno ] . lock , bank_tranche_id ) ;
shared - > bank_cur_lru_count [ bankno ] = 0 ;
}
/* Should fit to estimated shmem size */
Assert ( ptr - ( char * ) shared < = SimpleLruShmemSize ( nslots , nlsns ) ) ;
}
else
{
Assert ( found ) ;
Assert ( shared - > num_slots = = nslots ) ;
}
/*
* Initialize the unshared control struct , including directory path . We
@ -292,16 +329,33 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ctl - > shared = shared ;
ctl - > sync_handler = sync_handler ;
ctl - > long_segment_names = long_segment_names ;
ctl - > bank_mask = ( nslots / SLRU_BANK_SIZE ) - 1 ;
strlcpy ( ctl - > Dir , subdir , sizeof ( ctl - > Dir ) ) ;
}
/*
* Helper function for GUC check_hook to check whether slru buffers are in
* multiples of SLRU_BANK_SIZE .
*/
bool
check_slru_buffers ( const char * name , int * newval )
{
/* Valid values are multiples of SLRU_BANK_SIZE */
if ( * newval % SLRU_BANK_SIZE = = 0 )
return true ;
GUC_check_errdetail ( " \" %s \" must be a multiple of %d " , name ,
SLRU_BANK_SIZE ) ;
return false ;
}
/*
* Initialize ( or reinitialize ) a page to zeroes .
*
* The page is not actually written , just set up in shared memory .
* The slot number of the new page is returned .
*
* Control lock must be held at entry , and will be held at exit .
* Bank lock must be held at entry , and will be held at exit .
*/
int
SimpleLruZeroPage ( SlruCtl ctl , int64 pageno )
@ -309,6 +363,8 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
SlruShared shared = ctl - > shared ;
int slotno ;
Assert ( LWLockHeldByMeInMode ( SimpleLruGetBankLock ( ctl , pageno ) , LW_EXCLUSIVE ) ) ;
/* Find a suitable buffer slot for the page */
slotno = SlruSelectLRUPage ( ctl , pageno ) ;
Assert ( shared - > page_status [ slotno ] = = SLRU_PAGE_EMPTY | |
@ -369,18 +425,21 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
* guarantee that new I / O hasn ' t been started before we return , though .
* In fact the slot might not even contain the same page anymore . )
*
* Control lock must be held at entry , and will be held at exit .
* Bank lock must be held at entry , and will be held at exit .
*/
static void
SimpleLruWaitIO ( SlruCtl ctl , int slotno )
{
SlruShared shared = ctl - > shared ;
int bankno = SlotGetBankNumber ( slotno ) ;
Assert ( & shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY ) ;
/* See notes at top of file */
LWLockRelease ( shared - > ControlLock ) ;
LWLockRelease ( & shared - > bank_locks [ bankno ] . l ock) ;
LWLockAcquire ( & shared - > buffer_locks [ slotno ] . lock , LW_SHARED ) ;
LWLockRelease ( & shared - > buffer_locks [ slotno ] . lock ) ;
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
LWLockAcquire ( & shared - > bank_locks [ bankno ] . l ock, LW_EXCLUSIVE ) ;
/*
* If the slot is still in an io - in - progress state , then either someone
@ -423,7 +482,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Return value is the shared - buffer slot number now holding the page .
* The buffer ' s LRU access info is updated .
*
* Control lock must be held at entry , and will be held at exit .
* The correct bank lock must be held at entry , and will be held at exit .
*/
int
SimpleLruReadPage ( SlruCtl ctl , int64 pageno , bool write_ok ,
@ -431,18 +490,21 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
{
SlruShared shared = ctl - > shared ;
Assert ( LWLockHeldByMeInMode ( SimpleLruGetBankLock ( ctl , pageno ) , LW_EXCLUSIVE ) ) ;
/* Outer loop handles restart if we must wait for someone else's I/O */
for ( ; ; )
{
int slotno ;
int bankno ;
bool ok ;
/* See if page already is in memory; if not, pick victim slot */
slotno = SlruSelectLRUPage ( ctl , pageno ) ;
/* Did we find the page in memory? */
if ( shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY )
if ( shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY & &
shared - > page_number [ slotno ] = = pageno )
{
/*
* If page is still being read in , we must wait for I / O . Likewise
@ -477,9 +539,10 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire ( & shared - > buffer_locks [ slotno ] . lock , LW_EXCLUSIVE ) ;
bankno = SlotGetBankNumber ( slotno ) ;
/* Release control lock while doing I/O */
LWLockRelease ( shared - > ControlL ock) ;
/* Release bank lock while doing I/O */
LWLockRelease ( & shared - > bank_locks [ bankno ] . l ock) ;
/* Do the read */
ok = SlruPhysicalReadPage ( ctl , pageno , slotno ) ;
@ -487,8 +550,8 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Set the LSNs for this newly read-in page to zero */
SimpleLruZeroLSNs ( ctl , slotno ) ;
/* Re-acquire control lock and update page state */
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
/* Re-acquire bank control lock and update page state */
LWLockAcquire ( & shared - > bank_locks [ bankno ] . l ock, LW_EXCLUSIVE ) ;
Assert ( shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] = = SLRU_PAGE_READ_IN_PROGRESS & &
@ -522,22 +585,25 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
* Return value is the shared - buffer slot number now holding the page .
* The buffer ' s LRU access info is updated .
*
* C ontrol lock must NOT be held at entry , but will be held at exit .
* Bank c ontrol lock must NOT be held at entry , but will be held at exit .
* It is unspecified whether the lock will be shared or exclusive .
*/
int
SimpleLruReadPage_ReadOnly ( SlruCtl ctl , int64 pageno , TransactionId xid )
{
SlruShared shared = ctl - > shared ;
int bankno = pageno & ctl - > bank_mask ;
int bankstart = bankno * SLRU_BANK_SIZE ;
int bankend = bankstart + SLRU_BANK_SIZE ;
/* Try to find the page while holding only shared lock */
LWLockAcquire ( shared - > ControlLock , LW_SHARED ) ;
LWLockAcquire ( & shared - > bank_locks [ bankno ] . l ock, LW_SHARED ) ;
/* See if page is already in a buffer */
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
for ( int slotno = bankstart ; slotno < bankend ; slotno + + )
{
if ( shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY & &
if ( shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY & &
shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] ! = SLRU_PAGE_READ_IN_PROGRESS )
{
/* See comments for SlruRecentlyUsed macro */
@ -551,8 +617,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/* No luck, so switch to normal exclusive lock and do regular read */
LWLockRelease ( shared - > ControlL ock) ;
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
LWLockRelease ( & shared - > bank_locks [ bankno ] . l ock) ;
LWLockAcquire ( & shared - > bank_locks [ bankno ] . l ock, LW_EXCLUSIVE ) ;
return SimpleLruReadPage ( ctl , pageno , true , xid ) ;
}
@ -566,15 +632,19 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
* the write ) . However , we * do * attempt a fresh write even if the page
* is already being written ; this is for checkpoints .
*
* Control lock must be held at entry , and will be held at exit .
* Bank lock must be held at entry , and will be held at exit .
*/
static void
SlruInternalWritePage ( SlruCtl ctl , int slotno , SlruWriteAll fdata )
{
SlruShared shared = ctl - > shared ;
int64 pageno = shared - > page_number [ slotno ] ;
int bankno = SlotGetBankNumber ( slotno ) ;
bool ok ;
Assert ( shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY ) ;
Assert ( LWLockHeldByMeInMode ( SimpleLruGetBankLock ( ctl , pageno ) , LW_EXCLUSIVE ) ) ;
/* If a write is in progress, wait for it to finish */
while ( shared - > page_status [ slotno ] = = SLRU_PAGE_WRITE_IN_PROGRESS & &
shared - > page_number [ slotno ] = = pageno )
@ -601,8 +671,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire ( & shared - > buffer_locks [ slotno ] . lock , LW_EXCLUSIVE ) ;
/* Release control lock while doing I/O */
LWLockRelease ( shared - > ControlL ock) ;
/* Release bank lock while doing I/O */
LWLockRelease ( & shared - > bank_locks [ bankno ] . l ock) ;
/* Do the write */
ok = SlruPhysicalWritePage ( ctl , pageno , slotno , fdata ) ;
@ -614,8 +684,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
CloseTransientFile ( fdata - > fd [ i ] ) ;
}
/* Re-acquire control lock and update page state */
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
/* Re-acquire bank lock and update page state */
LWLockAcquire ( & shared - > bank_locks [ bankno ] . l ock, LW_EXCLUSIVE ) ;
Assert ( shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] = = SLRU_PAGE_WRITE_IN_PROGRESS ) ;
@ -644,6 +714,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
void
SimpleLruWritePage ( SlruCtl ctl , int slotno )
{
Assert ( & ctl - > shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY ) ;
SlruInternalWritePage ( ctl , slotno , NULL ) ;
}
@ -1028,17 +1100,53 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/*
* Select the slot to re - use when we need a free slot .
* Mark a buffer slot " most recently used " .
*/
static inline void
SlruRecentlyUsed ( SlruShared shared , int slotno )
{
int bankno = SlotGetBankNumber ( slotno ) ;
int new_lru_count = shared - > bank_cur_lru_count [ bankno ] ;
Assert ( shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY ) ;
/*
* The reason for the if - test is that there are often many consecutive
* accesses to the same page ( particularly the latest page ) . By
* suppressing useless increments of bank_cur_lru_count , we reduce the
* probability that old pages ' counts will " wrap around " and make them
* appear recently used .
*
* We allow this code to be executed concurrently by multiple processes
* within SimpleLruReadPage_ReadOnly ( ) . As long as int reads and writes
* are atomic , this should not cause any completely - bogus values to enter
* the computation . However , it is possible for either bank_cur_lru_count
* or individual page_lru_count entries to be " reset " to lower values than
* they should have , in case a process is delayed while it executes this
* function . With care in SlruSelectLRUPage ( ) , this does little harm , and
* in any case the absolute worst possible consequence is a nonoptimal
* choice of page to evict . The gain from allowing concurrent reads of
* SLRU pages seems worth it .
*/
if ( new_lru_count ! = shared - > page_lru_count [ slotno ] )
{
shared - > bank_cur_lru_count [ bankno ] = + + new_lru_count ;
shared - > page_lru_count [ slotno ] = new_lru_count ;
}
}
/*
* Select the slot to re - use when we need a free slot for the given page .
*
* The target page number is passed because we need to consider the
* possibility that some other process reads in the target page while
* we are doing I / O to free a slot . Hence , check or recheck to see if
* any slot already holds the target page , and return that slot if so .
* Thus , the returned slot is * either * a slot already holding the pageno
* ( could be any state except EMPTY ) , * or * a freeable slot ( state EMPTY
* or CLEAN ) .
* The target page number is passed not only because we need to know the
* correct bank to use , but also because we need to consider the possibility
* that some other process reads in the target page while we are doing I / O to
* free a slot . Hence , check or recheck to see if any slot already holds the
* target page , and return that slot if so . Thus , the returned slot is
* * either * a slot already holding the pageno ( could be any state except
* EMPTY ) , * or * a freeable slot ( state EMPTY or CLEAN ) .
*
* Control lock must be held at entry , and will be held at exit .
* The correct bank lock must be held at entry , and will be held at exit .
*/
static int
SlruSelectLRUPage ( SlruCtl ctl , int64 pageno )
@ -1055,12 +1163,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
int bestinvalidslot = 0 ; /* keep compiler quiet */
int best_invalid_delta = - 1 ;
int64 best_invalid_page_number = 0 ; /* keep compiler quiet */
int bankno = pageno & ctl - > bank_mask ;
int bankstart = bankno * SLRU_BANK_SIZE ;
int bankend = bankstart + SLRU_BANK_SIZE ;
Assert ( LWLockHeldByMe ( & shared - > bank_locks [ bankno ] . lock ) ) ;
/* See if page already has a buffer assigned */
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
{
if ( shared - > page_number [ slotno ] = = pageno & &
shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY )
if ( shared - > page_status [ slotno ] ! = SLRU_PAGE_EMPTY & &
shared - > page_number [ slotno ] = = pageno )
return slotno ;
}
@ -1091,14 +1204,15 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
* That gets us back on the path to having good data when there are
* multiple pages with the same lru_count .
*/
cur_count = ( shared - > cur_lru_count ) + + ;
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
cur_count = ( shared - > bank_ cur_lru_count[ bankno ] ) + + ;
for ( int slotno = bankstart ; slotno < bankend ; slotno + + )
{
int this_delta ;
int64 this_page_number ;
if ( shared - > page_status [ slotno ] = = SLRU_PAGE_EMPTY )
return slotno ;
this_delta = cur_count - shared - > page_lru_count [ slotno ] ;
if ( this_delta < 0 )
{
@ -1193,6 +1307,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
SlruShared shared = ctl - > shared ;
SlruWriteAllData fdata ;
int64 pageno = 0 ;
int prevbank = SlotGetBankNumber ( 0 ) ;
bool ok ;
/* update the stats counter of flushes */
@ -1203,10 +1318,27 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
*/
fdata . num_files = 0 ;
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
LWLockAcquire ( & shared - > bank_locks [ prevbank ] . l ock, LW_EXCLUSIVE ) ;
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
{
int curbank = SlotGetBankNumber ( slotno ) ;
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock .
*/
if ( curbank ! = prevbank )
{
LWLockRelease ( & shared - > bank_locks [ prevbank ] . lock ) ;
LWLockAcquire ( & shared - > bank_locks [ curbank ] . lock , LW_EXCLUSIVE ) ;
prevbank = curbank ;
}
/* Do nothing if slot is unused */
if ( shared - > page_status [ slotno ] = = SLRU_PAGE_EMPTY )
continue ;
SlruInternalWritePage ( ctl , slotno , & fdata ) ;
/*
@ -1220,7 +1352,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
! shared - > page_dirty [ slotno ] ) ) ;
}
LWLockRelease ( shared - > ControlL ock) ;
LWLockRelease ( & shared - > bank_locks [ prevbank ] . l ock) ;
/*
* Now close any files that were open
@ -1259,6 +1391,7 @@ void
SimpleLruTruncate ( SlruCtl ctl , int64 cutoffPage )
{
SlruShared shared = ctl - > shared ;
int prevbank ;
/* update the stats counter of truncates */
pgstat_count_slru_truncate ( shared - > slru_stats_idx ) ;
@ -1269,8 +1402,6 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
* or just after a checkpoint , any dirty pages should have been flushed
* already . . . we ' re just being extra careful here . )
*/
LWLockAcquire ( shared - > ControlLock , LW_EXCLUSIVE ) ;
restart :
/*
@ -1282,15 +1413,29 @@ restart:
if ( ctl - > PagePrecedes ( pg_atomic_read_u64 ( & shared - > latest_page_number ) ,
cutoffPage ) )
{
LWLockRelease ( shared - > ControlLock ) ;
ereport ( LOG ,
( errmsg ( " could not truncate directory \" %s \" : apparent wraparound " ,
ctl - > Dir ) ) ) ;
return ;
}
prevbank = SlotGetBankNumber ( 0 ) ;
LWLockAcquire ( & shared - > bank_locks [ prevbank ] . lock , LW_EXCLUSIVE ) ;
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
{
int curbank = SlotGetBankNumber ( slotno ) ;
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock .
*/
if ( curbank ! = prevbank )
{
LWLockRelease ( & shared - > bank_locks [ prevbank ] . lock ) ;
LWLockAcquire ( & shared - > bank_locks [ curbank ] . lock , LW_EXCLUSIVE ) ;
prevbank = curbank ;
}
if ( shared - > page_status [ slotno ] = = SLRU_PAGE_EMPTY )
continue ;
if ( ! ctl - > PagePrecedes ( shared - > page_number [ slotno ] , cutoffPage ) )
@ -1320,10 +1465,12 @@ restart:
SlruInternalWritePage ( ctl , slotno , NULL ) ;
else
SimpleLruWaitIO ( ctl , slotno ) ;
LWLockRelease ( & shared - > bank_locks [ prevbank ] . lock ) ;
goto restart ;
}
LWLockRelease ( shared - > ControlL ock) ;
LWLockRelease ( & shared - > bank_locks [ prevbank ] . l ock) ;
/* Now we can remove the old segment(s) */
( void ) SlruScanDirectory ( ctl , SlruScanDirCbDeleteCutoff , & cutoffPage ) ;
@ -1362,19 +1509,33 @@ void
SlruDeleteSegment ( SlruCtl ctl , int64 segno )
{
SlruShared shared = ctl - > shared ;
int prevbank = SlotGetBankNumber ( 0 ) ;
bool did_write ;
/* Clean out any possibly existing references to the segment. */
LWLockAcquire ( shared - > ControlL ock, LW_EXCLUSIVE ) ;
LWLockAcquire ( & shared - > bank_locks [ prevbank ] . l ock, LW_EXCLUSIVE ) ;
restart :
did_write = false ;
for ( int slotno = 0 ; slotno < shared - > num_slots ; slotno + + )
{
int pagesegno = shared - > page_number [ slotno ] / SLRU_PAGES_PER_SEGMENT ;
int pagesegno ;
int curbank = SlotGetBankNumber ( slotno ) ;
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock .
*/
if ( curbank ! = prevbank )
{
LWLockRelease ( & shared - > bank_locks [ prevbank ] . lock ) ;
LWLockAcquire ( & shared - > bank_locks [ curbank ] . lock , LW_EXCLUSIVE ) ;
prevbank = curbank ;
}
if ( shared - > page_status [ slotno ] = = SLRU_PAGE_EMPTY )
continue ;
pagesegno = shared - > page_number [ slotno ] / SLRU_PAGES_PER_SEGMENT ;
/* not the segment we're looking for */
if ( pagesegno ! = segno )
continue ;
@ -1405,7 +1566,7 @@ restart:
SlruInternalDeleteSegment ( ctl , segno ) ;
LWLockRelease ( shared - > ControlL ock) ;
LWLockRelease ( & shared - > bank_locks [ prevbank ] . l ock) ;
}
/*