@ -35,11 +35,13 @@
# include "access/clog.h"
# include "access/slru.h"
# include "access/transam.h"
# include "access/twophase.h"
# include "access/xlog.h"
# include "access/xloginsert.h"
# include "access/xlogutils.h"
# include "miscadmin.h"
# include "pg_trace.h"
# include "storage/proc.h"
/*
* Defines for CLOG page sizes . A page is the same BLCKSZ as is used
@ -86,11 +88,17 @@ static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec ( int pageno ) ;
static void TransactionIdSetPageStatus ( TransactionId xid , int nsubxids ,
TransactionId * subxids , XidStatus status ,
XLogRecPtr lsn , int pageno ) ;
XLogRecPtr lsn , int pageno ,
bool all_xact_same_page ) ;
static void TransactionIdSetStatusBit ( TransactionId xid , XidStatus status ,
XLogRecPtr lsn , int slotno ) ;
static void set_status_by_pages ( int nsubxids , TransactionId * subxids ,
XidStatus status , XLogRecPtr lsn ) ;
static bool TransactionGroupUpdateXidStatus ( TransactionId xid , XidStatus status ,
XLogRecPtr lsn , int pageno ) ;
static void TransactionIdSetPageStatusInternal ( TransactionId xid , int nsubxids ,
TransactionId * subxids , XidStatus status ,
XLogRecPtr lsn , int pageno ) ;
/*
@ -173,7 +181,7 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
* Set the parent and all subtransactions in a single call
*/
TransactionIdSetPageStatus ( xid , nsubxids , subxids , status , lsn ,
pageno ) ;
pageno , true ) ;
}
else
{
@ -200,7 +208,7 @@ TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
*/
pageno = TransactionIdToPage ( xid ) ;
TransactionIdSetPageStatus ( xid , nsubxids_on_first_page , subxids , status ,
lsn , pageno ) ;
lsn , pageno , false ) ;
/*
* Now work through the rest of the subxids one clog page at a time ,
@ -238,7 +246,7 @@ set_status_by_pages(int nsubxids, TransactionId *subxids,
TransactionIdSetPageStatus ( InvalidTransactionId ,
num_on_page , subxids + offset ,
status , lsn , pageno ) ;
status , lsn , pageno , false ) ;
offset = i ;
pageno = TransactionIdToPage ( subxids [ offset ] ) ;
}
@ -248,12 +256,70 @@ set_status_by_pages(int nsubxids, TransactionId *subxids,
* Record the final state of transaction entries in the commit log for
* all entries on a single page . Atomic only on this page .
*
* Otherwise API is same as TransactionIdSetTreeStatus ( )
* When there is contention on CLogControlLock , we try to group multiple
* updates ; a single leader process will perform transaction status updates
* for multiple backends so that the number of times CLogControlLock needs
* to be acquired is reduced . We don ' t try to do this if a process has
* overflowed the subxids array in its PGPROC , since in that case we
* don ' t have a complete list of XIDs for it . We also skip it if a process
* has XIDs on more than one CLOG page , or on a different CLOG page than
* processes already waiting for a group update . This latter condition
* has a race condition ( see TransactionGroupUpdateXidStatus ) but the
* worst thing that happens if we mess up is a small loss of efficiency ;
* the intent is to avoid having the leader access pages it wouldn ' t
* otherwise need to touch . Finally , we skip it for prepared transactions ,
* which don ' t have the semaphore we would need for this optimization ,
* and which are anyway probably not all that common .
*/
static void
TransactionIdSetPageStatus ( TransactionId xid , int nsubxids ,
TransactionId * subxids , XidStatus status ,
XLogRecPtr lsn , int pageno )
XLogRecPtr lsn , int pageno ,
bool all_xact_same_page )
{
if ( all_xact_same_page & &
nsubxids < PGPROC_MAX_CACHED_SUBXIDS & &
! IsGXactActive ( ) )
{
/*
* If we can immediately acquire CLogControlLock , we update the status
* of our own XID and release the lock . If not , try use group XID
* update . If that doesn ' t work out , fall back to waiting for the
* lock to perform an update for this transaction only .
*/
if ( LWLockConditionalAcquire ( CLogControlLock , LW_EXCLUSIVE ) )
{
TransactionIdSetPageStatusInternal ( xid , nsubxids , subxids , status , lsn , pageno ) ;
LWLockRelease ( CLogControlLock ) ;
}
else if ( ! TransactionGroupUpdateXidStatus ( xid , status , lsn , pageno ) )
{
LWLockAcquire ( CLogControlLock , LW_EXCLUSIVE ) ;
TransactionIdSetPageStatusInternal ( xid , nsubxids , subxids , status , lsn , pageno ) ;
LWLockRelease ( CLogControlLock ) ;
}
}
else
{
LWLockAcquire ( CLogControlLock , LW_EXCLUSIVE ) ;
TransactionIdSetPageStatusInternal ( xid , nsubxids , subxids , status , lsn , pageno ) ;
LWLockRelease ( CLogControlLock ) ;
}
}
/*
* Record the final state of transaction entry in the commit log
*
* We don ' t do any locking here ; caller must handle that .
*/
static void
TransactionIdSetPageStatusInternal ( TransactionId xid , int nsubxids ,
TransactionId * subxids , XidStatus status ,
XLogRecPtr lsn , int pageno )
{
int slotno ;
int i ;
@ -261,8 +327,7 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
Assert ( status = = TRANSACTION_STATUS_COMMITTED | |
status = = TRANSACTION_STATUS_ABORTED | |
( status = = TRANSACTION_STATUS_SUB_COMMITTED & & ! TransactionIdIsValid ( xid ) ) ) ;
LWLockAcquire ( CLogControlLock , LW_EXCLUSIVE ) ;
Assert ( LWLockHeldByMeInMode ( CLogControlLock , LW_EXCLUSIVE ) ) ;
/*
* If we ' re doing an async commit ( ie , lsn is valid ) , then we must wait
@ -310,8 +375,166 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
}
ClogCtl - > shared - > page_dirty [ slotno ] = true ;
}
/*
* When we cannot immediately acquire CLogControlLock in exclusive mode at
* commit time , add ourselves to a list of processes that need their XIDs
* status update . The first process to add itself to the list will acquire
* CLogControlLock in exclusive mode and set transaction status as required
* on behalf of all group members . This avoids a great deal of contention
* around CLogControlLock when many processes are trying to commit at once ,
* since the lock need not be repeatedly handed off from one committing
* process to the next .
*
* Returns true when transaction status has been updated in clog ; returns
* false if we decided against applying the optimization because the page
* number we need to update differs from those processes already waiting .
*/
static bool
TransactionGroupUpdateXidStatus ( TransactionId xid , XidStatus status ,
XLogRecPtr lsn , int pageno )
{
volatile PROC_HDR * procglobal = ProcGlobal ;
PGPROC * proc = MyProc ;
uint32 nextidx ;
uint32 wakeidx ;
/* We should definitely have an XID whose status needs to be updated. */
Assert ( TransactionIdIsValid ( xid ) ) ;
/*
* Add ourselves to the list of processes needing a group XID status
* update .
*/
proc - > clogGroupMember = true ;
proc - > clogGroupMemberXid = xid ;
proc - > clogGroupMemberXidStatus = status ;
proc - > clogGroupMemberPage = pageno ;
proc - > clogGroupMemberLsn = lsn ;
nextidx = pg_atomic_read_u32 ( & procglobal - > clogGroupFirst ) ;
while ( true )
{
/*
* Add the proc to list , if the clog page where we need to update the
* current transaction status is same as group leader ' s clog page .
*
* There is a race condition here , which is that after doing the below
* check and before adding this proc ' s clog update to a group , the
* group leader might have already finished the group update for this
* page and becomes group leader of another group . This will lead to a
* situation where a single group can have different clog page
* updates . This isn ' t likely and will still work , just maybe a bit
* less efficiently .
*/
if ( nextidx ! = INVALID_PGPROCNO & &
ProcGlobal - > allProcs [ nextidx ] . clogGroupMemberPage ! = proc - > clogGroupMemberPage )
{
proc - > clogGroupMember = false ;
return false ;
}
pg_atomic_write_u32 ( & proc - > clogGroupNext , nextidx ) ;
if ( pg_atomic_compare_exchange_u32 ( & procglobal - > clogGroupFirst ,
& nextidx ,
( uint32 ) proc - > pgprocno ) )
break ;
}
/*
* If the list was not empty , the leader will update the status of our
* XID . It is impossible to have followers without a leader because the
* first process that has added itself to the list will always have
* nextidx as INVALID_PGPROCNO .
*/
if ( nextidx ! = INVALID_PGPROCNO )
{
int extraWaits = 0 ;
/* Sleep until the leader updates our XID status. */
for ( ; ; )
{
/* acts as a read barrier */
PGSemaphoreLock ( proc - > sem ) ;
if ( ! proc - > clogGroupMember )
break ;
extraWaits + + ;
}
Assert ( pg_atomic_read_u32 ( & proc - > clogGroupNext ) = = INVALID_PGPROCNO ) ;
/* Fix semaphore count for any absorbed wakeups */
while ( extraWaits - - > 0 )
PGSemaphoreUnlock ( proc - > sem ) ;
return true ;
}
/* We are the leader. Acquire the lock on behalf of everyone. */
LWLockAcquire ( CLogControlLock , LW_EXCLUSIVE ) ;
/*
* Now that we ' ve got the lock , clear the list of processes waiting for
* group XID status update , saving a pointer to the head of the list .
* Trying to pop elements one at a time could lead to an ABA problem .
*/
nextidx = pg_atomic_exchange_u32 ( & procglobal - > clogGroupFirst , INVALID_PGPROCNO ) ;
/* Remember head of list so we can perform wakeups after dropping lock. */
wakeidx = nextidx ;
/* Walk the list and update the status of all XIDs. */
while ( nextidx ! = INVALID_PGPROCNO )
{
PGPROC * proc = & ProcGlobal - > allProcs [ nextidx ] ;
PGXACT * pgxact = & ProcGlobal - > allPgXact [ nextidx ] ;
/*
* Overflowed transactions should not use group XID status update
* mechanism .
*/
Assert ( ! pgxact - > overflowed ) ;
TransactionIdSetPageStatusInternal ( proc - > clogGroupMemberXid ,
pgxact - > nxids ,
proc - > subxids . xids ,
proc - > clogGroupMemberXidStatus ,
proc - > clogGroupMemberLsn ,
proc - > clogGroupMemberPage ) ;
/* Move to next proc in list. */
nextidx = pg_atomic_read_u32 ( & proc - > clogGroupNext ) ;
}
/* We're done with the lock now. */
LWLockRelease ( CLogControlLock ) ;
/*
* Now that we ' ve released the lock , go back and wake everybody up . We
* don ' t do this under the lock so as to keep lock hold times to a
* minimum . The system calls we need to perform to wake other processes
* up are probably slower and can cause performance slowdown if done under
* lock .
*/
while ( wakeidx ! = INVALID_PGPROCNO )
{
PGPROC * proc = & ProcGlobal - > allProcs [ wakeidx ] ;
wakeidx = pg_atomic_read_u32 ( & proc - > clogGroupNext ) ;
pg_atomic_write_u32 ( & proc - > clogGroupNext , INVALID_PGPROCNO ) ;
/* ensure all previous writes are visible before follower continues. */
pg_write_barrier ( ) ;
proc - > clogGroupMember = false ;
if ( proc ! = MyProc )
PGSemaphoreUnlock ( proc - > sem ) ;
}
return true ;
}
/*