@ -95,6 +95,13 @@ static uint32 pgfdw_we_get_result = 0;
*/
# define CONNECTION_CLEANUP_TIMEOUT 30000
/*
* Milliseconds to wait before issuing another cancel request . This covers
* the race condition where the remote session ignored our cancel request
* because it arrived while idle .
*/
# define RETRY_CANCEL_TIMEOUT 1000
/* Macro for constructing abort command to be sent */
# define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
do { \
@ -145,6 +152,7 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query ( PGconn * conn ) ;
static bool pgfdw_cancel_query_begin ( PGconn * conn , TimestampTz endtime ) ;
static bool pgfdw_cancel_query_end ( PGconn * conn , TimestampTz endtime ,
TimestampTz retrycanceltime ,
bool consume_input ) ;
static bool pgfdw_exec_cleanup_query ( PGconn * conn , const char * query ,
bool ignore_errors ) ;
@ -154,6 +162,7 @@ static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
bool consume_input ,
bool ignore_errors ) ;
static bool pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
TimestampTz retrycanceltime ,
PGresult * * result , bool * timed_out ) ;
static void pgfdw_abort_cleanup ( ConnCacheEntry * entry , bool toplevel ) ;
static bool pgfdw_abort_cleanup_begin ( ConnCacheEntry * entry , bool toplevel ,
@ -1322,18 +1331,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query ( PGconn * conn )
{
TimestampTz now = GetCurrentTimestamp ( ) ;
TimestampTz endtime ;
TimestampTz retrycanceltime ;
/*
* If it takes too long to cancel the query and discard the result , assume
* the connection is dead .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
endtime = TimestampTzPlusMilliseconds ( now , CONNECTION_CLEANUP_TIMEOUT ) ;
/*
* Also , lose patience and re - issue the cancel request after a little bit .
* ( This serves to close some race conditions . )
*/
retrycanceltime = TimestampTzPlusMilliseconds ( now , RETRY_CANCEL_TIMEOUT ) ;
if ( ! pgfdw_cancel_query_begin ( conn , endtime ) )
return false ;
return pgfdw_cancel_query_end ( conn , endtime , false ) ;
return pgfdw_cancel_query_end ( conn , endtime , retrycanceltime , false ) ;
}
/*
@ -1359,9 +1375,10 @@ pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
}
static bool
pgfdw_cancel_query_end ( PGconn * conn , TimestampTz endtime , bool consume_input )
pgfdw_cancel_query_end ( PGconn * conn , TimestampTz endtime ,
TimestampTz retrycanceltime , bool consume_input )
{
PGresult * result = NULL ;
PGresult * result ;
bool timed_out ;
/*
@ -1380,7 +1397,8 @@ pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
}
/* Get and discard the result of the query. */
if ( pgfdw_get_cleanup_result ( conn , endtime , & result , & timed_out ) )
if ( pgfdw_get_cleanup_result ( conn , endtime , retrycanceltime ,
& result , & timed_out ) )
{
if ( timed_out )
ereport ( WARNING ,
@ -1453,7 +1471,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
TimestampTz endtime , bool consume_input ,
bool ignore_errors )
{
PGresult * result = NULL ;
PGresult * result ;
bool timed_out ;
Assert ( query ! = NULL ) ;
@ -1471,7 +1489,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
}
/* Get the result of the query. */
if ( pgfdw_get_cleanup_result ( conn , endtime , & result , & timed_out ) )
if ( pgfdw_get_cleanup_result ( conn , endtime , endtime , & result , & timed_out ) )
{
if ( timed_out )
ereport ( WARNING ,
@ -1495,28 +1513,36 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
}
/*
* Get , during abort cleanup , the result of a query that is in progress . This
* might be a query that is being interrupted by transaction abort , or it might
* be a query that was initiated as part of transaction abort to get the remote
* side back to the appropriate state .
* Get , during abort cleanup , the result of a query that is in progress .
* This might be a query that is being interrupted by a cancel request or by
* transaction abort , or it might be a query that was initiated as part of
* transaction abort to get the remote side back to the appropriate state .
*
* endtime is the time at which we should give up and assume the remote side
* is dead . retrycanceltime is the time at which we should issue a fresh
* cancel request ( pass the same value as endtime if this is not wanted ) .
*
* endtime is the time at which we should give up and assume the remote
* side is dead . Returns true if the timeout expired or connection trouble
* occurred , false otherwise . Sets * result except in case of a timeout .
* Sets timed_out to true only when the timeout expired .
* Returns true if the timeout expired or connection trouble occurred ,
* false otherwise . Sets * result except in case of a true result .
* Sets * timed_out to true only when the timeout expired .
*/
static bool
pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime , PGresult * * result ,
pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
TimestampTz retrycanceltime ,
PGresult * * result ,
bool * timed_out )
{
volatile bool failed = false ;
PGresult * volatile last_res = NULL ;
* result = NULL ;
* timed_out = false ;
/* In what follows, do not leak any PGresults on an error. */
PG_TRY ( ) ;
{
int canceldelta = RETRY_CANCEL_TIMEOUT * 2 ;
for ( ; ; )
{
PGresult * res ;
@ -1527,8 +1553,33 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
TimestampTz now = GetCurrentTimestamp ( ) ;
long cur_timeout ;
/* If timeout has expired, give up. */
if ( now > = endtime )
{
* timed_out = true ;
failed = true ;
goto exit ;
}
/* If we need to re-issue the cancel request, do that. */
if ( now > = retrycanceltime )
{
/* We ignore failure to issue the repeated request. */
( void ) libpqsrv_cancel ( conn , endtime ) ;
/* Recompute "now" in case that took measurable time. */
now = GetCurrentTimestamp ( ) ;
/* Adjust re-cancel timeout in increasing steps. */
retrycanceltime = TimestampTzPlusMilliseconds ( now ,
canceldelta ) ;
canceldelta + = canceldelta ;
}
/* If timeout has expired, give up, else get sleep time. */
cur_timeout = TimestampDifferenceMilliseconds ( now , endtime ) ;
cur_timeout = TimestampDifferenceMilliseconds ( now ,
Min ( endtime ,
retrycanceltime ) ) ;
if ( cur_timeout < = 0 )
{
* timed_out = true ;
@ -1849,7 +1900,9 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
foreach ( lc , cancel_requested )
{
ConnCacheEntry * entry = ( ConnCacheEntry * ) lfirst ( lc ) ;
TimestampTz now = GetCurrentTimestamp ( ) ;
TimestampTz endtime ;
TimestampTz retrycanceltime ;
char sql [ 100 ] ;
Assert ( entry - > changing_xact_state ) ;
@ -1863,10 +1916,13 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
* remaining entries in the list , leading to slamming that entry ' s
* connection shut .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
endtime = TimestampTzPlusMilliseconds ( now ,
CONNECTION_CLEANUP_TIMEOUT ) ;
retrycanceltime = TimestampTzPlusMilliseconds ( now ,
RETRY_CANCEL_TIMEOUT ) ;
if ( ! pgfdw_cancel_query_end ( entry - > conn , endtime , true ) )
if ( ! pgfdw_cancel_query_end ( entry - > conn , endtime ,
retrycanceltime , true ) )
{
/* Unable to cancel running query */
pgfdw_reset_xact_state ( entry , toplevel ) ;