|
|
@ -58,7 +58,6 @@ WaitLSNShmemInit(void) |
|
|
|
&found); |
|
|
|
&found); |
|
|
|
if (!found) |
|
|
|
if (!found) |
|
|
|
{ |
|
|
|
{ |
|
|
|
SpinLockInit(&waitLSN->waitersHeapMutex); |
|
|
|
|
|
|
|
pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); |
|
|
|
pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); |
|
|
|
pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); |
|
|
|
pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); |
|
|
|
memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); |
|
|
|
memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); |
|
|
@ -115,13 +114,13 @@ addLSNWaiter(XLogRecPtr lsn) |
|
|
|
procInfo->procnum = MyProcNumber; |
|
|
|
procInfo->procnum = MyProcNumber; |
|
|
|
procInfo->waitLSN = lsn; |
|
|
|
procInfo->waitLSN = lsn; |
|
|
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
|
|
pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); |
|
|
|
pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); |
|
|
|
procInfo->inHeap = true; |
|
|
|
procInfo->inHeap = true; |
|
|
|
updateMinWaitedLSN(); |
|
|
|
updateMinWaitedLSN(); |
|
|
|
|
|
|
|
|
|
|
|
SpinLockRelease(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockRelease(WaitLSNLock); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
@ -132,11 +131,11 @@ deleteLSNWaiter(void) |
|
|
|
{ |
|
|
|
{ |
|
|
|
WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; |
|
|
|
WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; |
|
|
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
|
|
if (!procInfo->inHeap) |
|
|
|
if (!procInfo->inHeap) |
|
|
|
{ |
|
|
|
{ |
|
|
|
SpinLockRelease(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockRelease(WaitLSNLock); |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -144,7 +143,7 @@ deleteLSNWaiter(void) |
|
|
|
procInfo->inHeap = false; |
|
|
|
procInfo->inHeap = false; |
|
|
|
updateMinWaitedLSN(); |
|
|
|
updateMinWaitedLSN(); |
|
|
|
|
|
|
|
|
|
|
|
SpinLockRelease(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockRelease(WaitLSNLock); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
@ -160,7 +159,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) |
|
|
|
|
|
|
|
|
|
|
|
wakeUpProcNums = palloc(sizeof(int) * MaxBackends); |
|
|
|
wakeUpProcNums = palloc(sizeof(int) * MaxBackends); |
|
|
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Iterate the pairing heap of waiting processes till we find LSN not yet |
|
|
|
* Iterate the pairing heap of waiting processes till we find LSN not yet |
|
|
@ -182,7 +181,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN) |
|
|
|
|
|
|
|
|
|
|
|
updateMinWaitedLSN(); |
|
|
|
updateMinWaitedLSN(); |
|
|
|
|
|
|
|
|
|
|
|
SpinLockRelease(&waitLSN->waitersHeapMutex); |
|
|
|
LWLockRelease(WaitLSNLock); |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Set latches for processes, whose waited LSNs are already replayed. This |
|
|
|
* Set latches for processes, whose waited LSNs are already replayed. This |
|
|
|