@ -109,6 +109,12 @@ struct shm_mq
* locally by copying the chunks into a backend - local buffer . mqh_buffer is
* the buffer , and mqh_buflen is the number of bytes allocated for it .
*
* mqh_send_pending , is number of bytes that is written to the queue but not
* yet updated in the shared memory . We will not update it until the written
* data is 1 / 4 th of the ring size or the tuple queue is full . This will
* prevent frequent CPU cache misses , and it will also avoid frequent
* SetLatch ( ) calls , which are quite expensive .
*
* mqh_partial_bytes , mqh_expected_bytes , and mqh_length_word_complete
* are used to track the state of non - blocking operations . When the caller
* attempts a non - blocking operation that returns SHM_MQ_WOULD_BLOCK , they
@ -137,6 +143,7 @@ struct shm_mq_handle
char * mqh_buffer ;
Size mqh_buflen ;
Size mqh_consume_pending ;
Size mqh_send_pending ;
Size mqh_partial_bytes ;
Size mqh_expected_bytes ;
bool mqh_length_word_complete ;
@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
mqh - > mqh_buffer = NULL ;
mqh - > mqh_buflen = 0 ;
mqh - > mqh_consume_pending = 0 ;
mqh - > mqh_send_pending = 0 ;
mqh - > mqh_partial_bytes = 0 ;
mqh - > mqh_expected_bytes = 0 ;
mqh - > mqh_length_word_complete = false ;
@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
* Write a message into a shared message queue .
*/
shm_mq_result
shm_mq_send ( shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait )
shm_mq_send ( shm_mq_handle * mqh , Size nbytes , const void * data , bool nowait ,
bool force_flush )
{
shm_mq_iovec iov ;
iov . data = data ;
iov . len = nbytes ;
return shm_mq_sendv ( mqh , & iov , 1 , nowait ) ;
return shm_mq_sendv ( mqh , & iov , 1 , nowait , force_flush ) ;
}
/*
@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
* arguments , each time the process latch is set . ( Once begun , the sending
* of a message cannot be aborted except by detaching from the queue ; changing
* the length or payload will corrupt the queue . )
*
* When force_flush = true , we immediately update the shm_mq ' s mq_bytes_written
* and notify the receiver ( if it is already attached ) . Otherwise , we don ' t
* update it until we have written an amount of data greater than 1 / 4 th of the
* ring size .
*/
shm_mq_result
shm_mq_sendv ( shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait )
shm_mq_sendv ( shm_mq_handle * mqh , shm_mq_iovec * iov , int iovcnt , bool nowait ,
bool force_flush )
{
shm_mq_result res ;
shm_mq * mq = mqh - > mqh_queue ;
@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
mqh - > mqh_counterparty_attached = true ;
}
/* Notify receiver of the newly-written data, and return. */
SetLatch ( & receiver - > procLatch ) ;
/*
* If the caller has requested force flush or we have written more than 1 / 4
* of the ring size , mark it as written in shared memory and notify the
* receiver .
*/
if ( force_flush | | mqh - > mqh_send_pending > ( mq - > mq_ring_size > > 2 ) )
{
shm_mq_inc_bytes_written ( mq , mqh - > mqh_send_pending ) ;
SetLatch ( & receiver - > procLatch ) ;
mqh - > mqh_send_pending = 0 ;
}
return SHM_MQ_SUCCESS ;
}
@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
void
shm_mq_detach ( shm_mq_handle * mqh )
{
/* Before detaching, notify the receiver about any already-written data. */
if ( mqh - > mqh_send_pending > 0 )
{
shm_mq_inc_bytes_written ( mqh - > mqh_queue , mqh - > mqh_send_pending ) ;
mqh - > mqh_send_pending = 0 ;
}
/* Notify counterparty that we're outta here. */
shm_mq_detach_internal ( mqh - > mqh_queue ) ;
@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
/* Compute number of ring buffer bytes used and available. */
rb = pg_atomic_read_u64 ( & mq - > mq_bytes_read ) ;
wb = pg_atomic_read_u64 ( & mq - > mq_bytes_written ) ;
wb = pg_atomic_read_u64 ( & mq - > mq_bytes_written ) + mqh - > mqh_send_pending ;
Assert ( wb > = rb ) ;
used = wb - rb ;
Assert ( used < = ringsize ) ;
@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
}
else if ( available = = 0 )
{
/* Update the pending send bytes in the shared memory. */
shm_mq_inc_bytes_written ( mq , mqh - > mqh_send_pending ) ;
/*
* Since mq - > mqh_counterparty_attached is known to be true at this
* point , mq_receiver has been set , and it can ' t change once set .
@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
Assert ( mqh - > mqh_counterparty_attached ) ;
SetLatch ( & mq - > mq_receiver - > procLatch ) ;
/*
* We have just updated the mqh_send_pending bytes in the shared
* memory so reset it .
*/
mqh - > mqh_send_pending = 0 ;
/* Skip manipulation of our latch if nowait = true. */
if ( nowait )
{
@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
* MAXIMUM_ALIGNOF , and each read is as well .
*/
Assert ( sent = = nbytes | | sendnow = = MAXALIGN ( sendnow ) ) ;
shm_mq_inc_bytes_written ( mq , MAXALIGN ( sendnow ) ) ;
/*
* For efficiency , we don ' t set the reader ' s latch here . We ' ll do
* that only when the buffer fills up or after writing an entire
* message .
* For efficiency , we don ' t update the bytes written in the shared
* memory and also don ' t set the reader ' s latch here . Refer to
* the comments atop the shm_mq_handle structure for more
* information .
*/
mqh - > mqh_send_pending + = MAXALIGN ( sendnow ) ;
}
}