@ -159,13 +159,16 @@ ClockSweepTick(void)
* StrategyGetBuffer
* StrategyGetBuffer
*
*
* Called by the bufmgr to get the next candidate buffer to use in
* Called by the bufmgr to get the next candidate buffer to use in
* BufferAlloc ( ) . The only hard requirement BufferAlloc ( ) has is that
* GetVictim Buffer( ) . The only hard requirement GetVictim Buffer( ) has is that
* the selected buffer must not currently be pinned by anyone .
* the selected buffer must not currently be pinned by anyone .
*
*
* strategy is a BufferAccessStrategy object , or NULL for default strategy .
* strategy is a BufferAccessStrategy object , or NULL for default strategy .
*
*
* To ensure that no one else can pin the buffer before we do , we must
* It is the callers responsibility to ensure the buffer ownership can be
* return the buffer with the buffer header spinlock still held .
* tracked via TrackNewBufferPin ( ) .
*
* The buffer is pinned and marked as owned , using TrackNewBufferPin ( ) ,
* before returning .
*/
*/
BufferDesc *
BufferDesc *
StrategyGetBuffer ( BufferAccessStrategy strategy , uint32 * buf_state , bool * from_ring )
StrategyGetBuffer ( BufferAccessStrategy strategy , uint32 * buf_state , bool * from_ring )
@ -173,7 +176,6 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
BufferDesc * buf ;
BufferDesc * buf ;
int bgwprocno ;
int bgwprocno ;
int trycounter ;
int trycounter ;
uint32 local_buf_state ; /* to avoid repeated (de-)referencing */
* from_ring = false ;
* from_ring = false ;
@ -228,44 +230,79 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
trycounter = NBuffers ;
trycounter = NBuffers ;
for ( ; ; )
for ( ; ; )
{
{
uint32 old_buf_state ;
uint32 local_buf_state ;
buf = GetBufferDescriptor ( ClockSweepTick ( ) ) ;
buf = GetBufferDescriptor ( ClockSweepTick ( ) ) ;
/*
/*
* If the buffer is pinned or has a nonzero usage_count , we cannot use
* Check whether the buffer can be used and pin it if so . Do this
* it ; decrement the usage_count ( unless pinned ) and keep scanning .
* using a CAS loop , to avoid having to lock the buffer header .
*/
*/
local_buf_state = LockBufHdr ( buf ) ;
old_buf_state = pg_atomic_read_u32 ( & buf - > state ) ;
for ( ; ; )
if ( BUF_STATE_GET_REFCOUNT ( local_buf_state ) = = 0 )
{
{
local_buf_state = old_buf_state ;
/*
* If the buffer is pinned or has a nonzero usage_count , we cannot
* use it ; decrement the usage_count ( unless pinned ) and keep
* scanning .
*/
if ( BUF_STATE_GET_REFCOUNT ( local_buf_state ) ! = 0 )
{
if ( - - trycounter = = 0 )
{
/*
* We ' ve scanned all the buffers without making any state
* changes , so all the buffers are pinned ( or were when we
* looked at them ) . We could hope that someone will free
* one eventually , but it ' s probably better to fail than
* to risk getting stuck in an infinite loop .
*/
elog ( ERROR , " no unpinned buffers available " ) ;
}
break ;
}
if ( unlikely ( local_buf_state & BM_LOCKED ) )
{
old_buf_state = WaitBufHdrUnlocked ( buf ) ;
continue ;
}
if ( BUF_STATE_GET_USAGECOUNT ( local_buf_state ) ! = 0 )
if ( BUF_STATE_GET_USAGECOUNT ( local_buf_state ) ! = 0 )
{
{
local_buf_state - = BUF_USAGECOUNT_ONE ;
local_buf_state - = BUF_USAGECOUNT_ONE ;
trycounter = NBuffers ;
if ( pg_atomic_compare_exchange_u32 ( & buf - > state , & old_buf_state ,
local_buf_state ) )
{
trycounter = NBuffers ;
break ;
}
}
}
else
else
{
{
/* Found a usable buffer */
/* pin the buffer if the CAS succeeds */
if ( strategy ! = NULL )
local_buf_state + = BUF_REFCOUNT_ONE ;
AddBufferToRing ( strategy , buf ) ;
* buf_state = local_buf_state ;
if ( pg_atomic_compare_exchange_u32 ( & buf - > state , & old_buf_state ,
return buf ;
local_buf_state ) )
{
/* Found a usable buffer */
if ( strategy ! = NULL )
AddBufferToRing ( strategy , buf ) ;
* buf_state = local_buf_state ;
TrackNewBufferPin ( BufferDescriptorGetBuffer ( buf ) ) ;
return buf ;
}
}
}
}
}
else if ( - - trycounter = = 0 )
{
/*
* We ' ve scanned all the buffers without making any state changes ,
* so all the buffers are pinned ( or were when we looked at them ) .
* We could hope that someone will free one eventually , but it ' s
* probably better to fail than to risk getting stuck in an
* infinite loop .
*/
UnlockBufHdr ( buf , local_buf_state ) ;
elog ( ERROR , " no unpinned buffers available " ) ;
}
UnlockBufHdr ( buf , local_buf_state ) ;
}
}
}
}
@ -614,13 +651,15 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
* GetBufferFromRing - - returns a buffer from the ring , or NULL if the
* GetBufferFromRing - - returns a buffer from the ring , or NULL if the
* ring is empty / not usable .
* ring is empty / not usable .
*
*
* The bufhdr spin lock is held on the returned buffer .
* The buffer is pinned and marked as owned , using TrackNewBufferPin ( ) , before
* returning .
*/
*/
static BufferDesc *
static BufferDesc *
GetBufferFromRing ( BufferAccessStrategy strategy , uint32 * buf_state )
GetBufferFromRing ( BufferAccessStrategy strategy , uint32 * buf_state )
{
{
BufferDesc * buf ;
BufferDesc * buf ;
Buffer bufnum ;
Buffer bufnum ;
uint32 old_buf_state ;
uint32 local_buf_state ; /* to avoid repeated (de-)referencing */
uint32 local_buf_state ; /* to avoid repeated (de-)referencing */
@ -637,24 +676,48 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
if ( bufnum = = InvalidBuffer )
if ( bufnum = = InvalidBuffer )
return NULL ;
return NULL ;
buf = GetBufferDescriptor ( bufnum - 1 ) ;
/*
/*
* If the buffer is pinned we cannot use it under any circumstances .
* Check whether the buffer can be used and pin it if so . Do this using a
*
* CAS loop , to avoid having to lock the buffer header .
* If usage_count is 0 or 1 then the buffer is fair game ( we expect 1 ,
* since our own previous usage of the ring element would have left it
* there , but it might ' ve been decremented by clock - sweep since then ) . A
* higher usage_count indicates someone else has touched the buffer , so we
* shouldn ' t re - use it .
*/
*/
buf = GetBufferDescriptor ( bufnum - 1 ) ;
old_buf_state = pg_atomic_read_u32 ( & buf - > state ) ;
local_buf_state = LockBufHdr ( buf ) ;
for ( ; ; )
if ( BUF_STATE_GET_REFCOUNT ( local_buf_state ) = = 0
& & BUF_STATE_GET_USAGECOUNT ( local_buf_state ) < = 1 )
{
{
* buf_state = local_buf_state ;
local_buf_state = old_buf_state ;
return buf ;
/*
* If the buffer is pinned we cannot use it under any circumstances .
*
* If usage_count is 0 or 1 then the buffer is fair game ( we expect 1 ,
* since our own previous usage of the ring element would have left it
* there , but it might ' ve been decremented by clock - sweep since then ) .
* A higher usage_count indicates someone else has touched the buffer ,
* so we shouldn ' t re - use it .
*/
if ( BUF_STATE_GET_REFCOUNT ( local_buf_state ) ! = 0
| | BUF_STATE_GET_USAGECOUNT ( local_buf_state ) > 1 )
break ;
if ( unlikely ( local_buf_state & BM_LOCKED ) )
{
old_buf_state = WaitBufHdrUnlocked ( buf ) ;
continue ;
}
/* pin the buffer if the CAS succeeds */
local_buf_state + = BUF_REFCOUNT_ONE ;
if ( pg_atomic_compare_exchange_u32 ( & buf - > state , & old_buf_state ,
local_buf_state ) )
{
* buf_state = local_buf_state ;
TrackNewBufferPin ( BufferDescriptorGetBuffer ( buf ) ) ;
return buf ;
}
}
}
UnlockBufHdr ( buf , local_buf_state ) ;
/*
/*
* Tell caller to allocate a new buffer with the normal allocation
* Tell caller to allocate a new buffer with the normal allocation