@ -20,6 +20,7 @@
# include "access/parallel.h"
# include "commands/async.h"
# include "miscadmin.h"
# include "pgstat.h"
# include "replication/walsender.h"
# include "storage/ipc.h"
# include "storage/latch.h"
@ -45,13 +46,36 @@
* The flags are actually declared as " volatile sig_atomic_t " for maximum
* portability . This should ensure that loads and stores of the flag
* values are atomic , allowing us to dispense with any explicit locking .
*
* pss_signalFlags are intended to be set in cases where we don ' t need to
* keep track of whether or not the target process has handled the signal ,
* but sometimes we need confirmation , as when making a global state change
* that cannot be considered complete until all backends have taken notice
* of it . For such use cases , we set a bit in pss_barrierCheckMask and then
* increment the current " barrier generation " ; when the new barrier generation
* ( or greater ) appears in the pss_barrierGeneration flag of every process ,
* we know that the message has been received everywhere .
*/
typedef struct
{
pid_t pss_pid ;
sig_atomic_t pss_signalFlags [ NUM_PROCSIGNALS ] ;
pg_atomic_uint64 pss_barrierGeneration ;
pg_atomic_uint32 pss_barrierCheckMask ;
} ProcSignalSlot ;
/*
* Information that is global to the entire ProcSignal system can be stored
* here .
*
* psh_barrierGeneration is the highest barrier generation in existence .
*/
typedef struct
{
pg_atomic_uint64 psh_barrierGeneration ;
ProcSignalSlot psh_slot [ FLEXIBLE_ARRAY_MEMBER ] ;
} ProcSignalHeader ;
/*
* We reserve a slot for each possible BackendId , plus one for each
* possible auxiliary process type . ( This scheme assumes there is not
@ -59,11 +83,16 @@ typedef struct
*/
# define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES)
static ProcSignalSlot * ProcSignalSlots = NULL ;
/* Check whether the relevant type bit is set in the flags. */
# define BARRIER_SHOULD_CHECK(flags, type) \
( ( ( flags ) & ( ( ( uint32 ) 1 ) < < ( uint32 ) ( type ) ) ) ! = 0 )
static ProcSignalHeader * ProcSignal = NULL ;
static volatile ProcSignalSlot * MyProcSignalSlot = NULL ;
static bool CheckProcSignal ( ProcSignalReason reason ) ;
static void CleanupProcSignalState ( int status , Datum arg ) ;
static void ProcessBarrierPlaceholder ( void ) ;
/*
* ProcSignalShmemSize
@ -72,7 +101,11 @@ static void CleanupProcSignalState(int status, Datum arg);
Size
ProcSignalShmemSize ( void )
{
return NumProcSignalSlots * sizeof ( ProcSignalSlot ) ;
Size size ;
size = mul_size ( NumProcSignalSlots , sizeof ( ProcSignalSlot ) ) ;
size = add_size ( size , offsetof ( ProcSignalHeader , psh_slot ) ) ;
return size ;
}
/*
@ -85,12 +118,26 @@ ProcSignalShmemInit(void)
Size size = ProcSignalShmemSize ( ) ;
bool found ;
ProcSignalSlots = ( ProcSignalSlot * )
ShmemInitStruct ( " ProcSignalSlots " , size , & found ) ;
ProcSignal = ( ProcSignalHeader * )
ShmemInitStruct ( " ProcSignal " , size , & found ) ;
/* If we're first, set everything to zeroes */
/* If we're first, initialize. */
if ( ! found )
MemSet ( ProcSignalSlots , 0 , size ) ;
{
int i ;
pg_atomic_init_u64 ( & ProcSignal - > psh_barrierGeneration , 0 ) ;
for ( i = 0 ; i < NumProcSignalSlots ; + + i )
{
ProcSignalSlot * slot = & ProcSignal - > psh_slot [ i ] ;
slot - > pss_pid = 0 ;
MemSet ( slot - > pss_signalFlags , 0 , sizeof ( slot - > pss_signalFlags ) ) ;
pg_atomic_init_u64 ( & slot - > pss_barrierGeneration , PG_UINT64_MAX ) ;
pg_atomic_init_u32 ( & slot - > pss_barrierCheckMask , 0 ) ;
}
}
}
/*
@ -104,10 +151,11 @@ void
ProcSignalInit ( int pss_idx )
{
volatile ProcSignalSlot * slot ;
uint64 barrier_generation ;
Assert ( pss_idx > = 1 & & pss_idx < = NumProcSignalSlots ) ;
slot = & ProcSignalSlots [ pss_idx - 1 ] ;
slot = & ProcSignal - > psh_slot [ pss_idx - 1 ] ;
/* sanity check */
if ( slot - > pss_pid ! = 0 )
@ -117,6 +165,23 @@ ProcSignalInit(int pss_idx)
/* Clear out any leftover signal reasons */
MemSet ( slot - > pss_signalFlags , 0 , NUM_PROCSIGNALS * sizeof ( sig_atomic_t ) ) ;
/*
* Initialize barrier state . Since we ' re a brand - new process , there
* shouldn ' t be any leftover backend - private state that needs to be
* updated . Therefore , we can broadcast the latest barrier generation
* and disregard any previously - set check bits .
*
* NB : This only works if this initialization happens early enough in the
* startup sequence that we haven ' t yet cached any state that might need
* to be invalidated . That ' s also why we have a memory barrier here , to
* be sure that any later reads of memory happen strictly after this .
*/
pg_atomic_write_u32 ( & slot - > pss_barrierCheckMask , 0 ) ;
barrier_generation =
pg_atomic_read_u64 ( & ProcSignal - > psh_barrierGeneration ) ;
pg_atomic_write_u64 ( & slot - > pss_barrierGeneration , barrier_generation ) ;
pg_memory_barrier ( ) ;
/* Mark slot with my PID */
slot - > pss_pid = MyProcPid ;
@ -129,7 +194,7 @@ ProcSignalInit(int pss_idx)
/*
* CleanupProcSignalState
* Remove current process from ProcSignalSlots
* Remove current process from ProcSignal mechanism
*
* This function is called via on_shmem_exit ( ) during backend shutdown .
*/
@ -139,7 +204,7 @@ CleanupProcSignalState(int status, Datum arg)
int pss_idx = DatumGetInt32 ( arg ) ;
volatile ProcSignalSlot * slot ;
slot = & ProcSignalSlots [ pss_idx - 1 ] ;
slot = & ProcSignal - > psh_slot [ pss_idx - 1 ] ;
Assert ( slot = = MyProcSignalSlot ) ;
/*
@ -161,6 +226,12 @@ CleanupProcSignalState(int status, Datum arg)
return ; /* XXX better to zero the slot anyway? */
}
/*
* Make this slot look like it ' s absorbed all possible barriers , so that
* no barrier waits block on it .
*/
pg_atomic_write_u64 ( & slot - > pss_barrierGeneration , PG_UINT64_MAX ) ;
slot - > pss_pid = 0 ;
}
@ -182,7 +253,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
if ( backendId ! = InvalidBackendId )
{
slot = & ProcSignalSlots [ backendId - 1 ] ;
slot = & ProcSignal - > psh_slot [ backendId - 1 ] ;
/*
* Note : Since there ' s no locking , it ' s possible that the target
@ -212,7 +283,7 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
for ( i = NumProcSignalSlots - 1 ; i > = 0 ; i - - )
{
slot = & ProcSignalSlots [ i ] ;
slot = & ProcSignal - > psh_slot [ i ] ;
if ( slot - > pss_pid = = pid )
{
@ -230,6 +301,187 @@ SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
return - 1 ;
}
/*
* EmitProcSignalBarrier
* Send a signal to every Postgres process
*
* The return value of this function is the barrier " generation " created
* by this operation . This value can be passed to WaitForProcSignalBarrier
* to wait until it is known that every participant in the ProcSignal
* mechanism has absorbed the signal ( or started afterwards ) .
*
* Note that it would be a bad idea to use this for anything that happens
* frequently , as interrupting every backend could cause a noticeable
* performance hit .
*
* Callers are entitled to assume that this function will not throw ERROR
* or FATAL .
*/
uint64
EmitProcSignalBarrier ( ProcSignalBarrierType type )
{
uint64 flagbit = UINT64CONST ( 1 ) < < ( uint64 ) type ;
uint64 generation ;
/*
* Set all the flags .
*
* Note that pg_atomic_fetch_or_u32 has full barrier semantics , so this
* is totally ordered with respect to anything the caller did before , and
* anything that we do afterwards . ( This is also true of the later call
* to pg_atomic_add_fetch_u64 . )
*/
for ( int i = 0 ; i < NumProcSignalSlots ; i + + )
{
volatile ProcSignalSlot * slot = & ProcSignal - > psh_slot [ i ] ;
pg_atomic_fetch_or_u32 ( & slot - > pss_barrierCheckMask , flagbit ) ;
}
/*
* Increment the generation counter .
*/
generation =
pg_atomic_add_fetch_u64 ( & ProcSignal - > psh_barrierGeneration , 1 ) ;
/*
* Signal all the processes , so that they update their advertised barrier
* generation .
*
* Concurrency is not a problem here . Backends that have exited don ' t
* matter , and new backends that have joined since we entered this function
* must already have current state , since the caller is responsible for
* making sure that the relevant state is entirely visible before calling
* this function in the first place . We still have to wake them up -
* because we can ' t distinguish between such backends and older backends
* that need to update state - but they won ' t actually need to change
* any state .
*/
for ( int i = NumProcSignalSlots - 1 ; i > = 0 ; i - - )
{
volatile ProcSignalSlot * slot = & ProcSignal - > psh_slot [ i ] ;
pid_t pid = slot - > pss_pid ;
if ( pid ! = 0 )
kill ( pid , SIGUSR1 ) ;
}
return generation ;
}
/*
* WaitForProcSignalBarrier - wait until it is guaranteed that all changes
* requested by a specific call to EmitProcSignalBarrier ( ) have taken effect .
*
* We expect that the barrier will normally be absorbed very quickly by other
* backends , so we start by waiting just 1 / 8 of a second and then back off
* by a factor of two every time we time out , to a maximum wait time of
* 1 second .
*/
void
WaitForProcSignalBarrier ( uint64 generation )
{
long timeout = 125L ;
for ( int i = NumProcSignalSlots - 1 ; i > = 0 ; i - - )
{
volatile ProcSignalSlot * slot = & ProcSignal - > psh_slot [ i ] ;
uint64 oldval ;
oldval = pg_atomic_read_u64 ( & slot - > pss_barrierGeneration ) ;
while ( oldval < generation )
{
int events ;
CHECK_FOR_INTERRUPTS ( ) ;
events =
WaitLatch ( MyLatch ,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
timeout , WAIT_EVENT_PROC_SIGNAL_BARRIER ) ;
ResetLatch ( MyLatch ) ;
oldval = pg_atomic_read_u64 ( & slot - > pss_barrierGeneration ) ;
if ( events & WL_TIMEOUT )
timeout = Min ( timeout * 2 , 1000L ) ;
}
}
/*
* The caller is probably calling this function because it wants to
* read the shared state or perform further writes to shared state once
* all backends are known to have absorbed the barrier . However , the
* read of pss_barrierGeneration was performed unlocked ; insert a memory
* barrier to separate it from whatever follows .
*/
pg_memory_barrier ( ) ;
}
/*
* Perform global barrier related interrupt checking .
*
* Any backend that participates in ProcSignal signalling must arrange to
* call this function periodically . It is called from CHECK_FOR_INTERRUPTS ( ) ,
* which is enough for normal backends , but not necessarily for all types of
* background processes .
*/
void
ProcessProcSignalBarrier ( void )
{
uint64 generation ;
uint32 flags ;
/* Exit quickly if there's no work to do. */
if ( ! ProcSignalBarrierPending )
return ;
ProcSignalBarrierPending = false ;
/*
* Read the current barrier generation , and then get the flags that
* are set for this backend . Note that pg_atomic_exchange_u32 is a full
* barrier , so we ' re guaranteed that the read of the barrier generation
* happens before we atomically extract the flags , and that any subsequent
* state changes happen afterward .
*/
generation = pg_atomic_read_u64 ( & ProcSignal - > psh_barrierGeneration ) ;
flags = pg_atomic_exchange_u32 ( & MyProcSignalSlot - > pss_barrierCheckMask , 0 ) ;
/*
* Process each type of barrier . It ' s important that nothing we call from
* here throws an error , because pss_barrierCheckMask has already been
* cleared . If we jumped out of here before processing all barrier types ,
* then we ' d forget about the need to do so later .
*
* NB : It ought to be OK to call the barrier - processing functions
* unconditionally , but it ' s more efficient to call only the ones that
* might need us to do something based on the flags .
*/
if ( BARRIER_SHOULD_CHECK ( flags , PROCSIGNAL_BARRIER_PLACEHOLDER ) )
ProcessBarrierPlaceholder ( ) ;
/*
* State changes related to all types of barriers that might have been
* emitted have now been handled , so we can update our notion of the
* generation to the one we observed before beginning the updates . If
* things have changed further , it ' ll get fixed up when this function is
* next called .
*/
pg_atomic_write_u64 ( & MyProcSignalSlot - > pss_barrierGeneration , generation ) ;
}
static void
ProcessBarrierPlaceholder ( void )
{
/*
* XXX . This is just a placeholder until the first real user of this
* machinery gets committed . Rename PROCSIGNAL_BARRIER_PLACEHOLDER to
* PROCSIGNAL_BARRIER_SOMETHING_ELSE where SOMETHING_ELSE is something
* appropriately descriptive . Get rid of this function and instead have
* ProcessBarrierSomethingElse . Most likely , that function should live
* in the file pertaining to that subsystem , rather than here .
*/
}
/*
* CheckProcSignal - check to see if a particular reason has been
* signaled , and clear the signal flag . Should be called after receiving
@ -253,6 +505,27 @@ CheckProcSignal(ProcSignalReason reason)
return false ;
}
/*
* CheckProcSignalBarrier - check for new barriers we need to absorb
*/
static bool
CheckProcSignalBarrier ( void )
{
volatile ProcSignalSlot * slot = MyProcSignalSlot ;
if ( slot ! = NULL )
{
uint64 mygen ;
uint64 curgen ;
mygen = pg_atomic_read_u64 ( & slot - > pss_barrierGeneration ) ;
curgen = pg_atomic_read_u64 ( & ProcSignal - > psh_barrierGeneration ) ;
return ( mygen ! = curgen ) ;
}
return false ;
}
/*
* procsignal_sigusr1_handler - handle SIGUSR1 signal .
*/
@ -291,6 +564,12 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if ( CheckProcSignal ( PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ) )
RecoveryConflictInterrupt ( PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ) ;
if ( CheckProcSignalBarrier ( ) )
{
InterruptPending = true ;
ProcSignalBarrierPending = true ;
}
SetLatch ( MyLatch ) ;
latch_sigusr1_handler ( ) ;