@ -60,6 +60,7 @@ typedef struct ConnCacheEntry
bool have_error ; /* have any subxacts aborted in this xact? */
bool changing_xact_state ; /* xact state change in process */
bool parallel_commit ; /* do we commit (sub)xacts in parallel? */
bool parallel_abort ; /* do we abort (sub)xacts in parallel? */
bool invalidated ; /* true if reconnect is pending */
bool keep_connections ; /* setting value of keep_connections
* server option */
@ -81,6 +82,25 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false ;
/*
* Milliseconds to wait to cancel an in - progress query or execute a cleanup
* query ; if it takes longer than 30 seconds to do these , we assume the
* connection is dead .
*/
# define CONNECTION_CLEANUP_TIMEOUT 30000
/* Macro for constructing abort command to be sent */
# define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
do { \
if ( toplevel ) \
snprintf ( ( sql ) , sizeof ( sql ) , \
" ABORT TRANSACTION " ) ; \
else \
snprintf ( ( sql ) , sizeof ( sql ) , \
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " , \
( entry ) - > xact_depth , ( entry ) - > xact_depth ) ; \
} while ( 0 )
/*
* SQL functions
*/
@ -107,14 +127,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry * entry ) ;
static void pgfdw_reset_xact_state ( ConnCacheEntry * entry , bool toplevel ) ;
static bool pgfdw_cancel_query ( PGconn * conn ) ;
static bool pgfdw_cancel_query_begin ( PGconn * conn ) ;
static bool pgfdw_cancel_query_end ( PGconn * conn , TimestampTz endtime ,
bool consume_input ) ;
static bool pgfdw_exec_cleanup_query ( PGconn * conn , const char * query ,
bool ignore_errors ) ;
static bool pgfdw_exec_cleanup_query_begin ( PGconn * conn , const char * query ) ;
static bool pgfdw_exec_cleanup_query_end ( PGconn * conn , const char * query ,
TimestampTz endtime ,
bool consume_input ,
bool ignore_errors ) ;
static bool pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
PGresult * * result , bool * timed_out ) ;
static void pgfdw_abort_cleanup ( ConnCacheEntry * entry , bool toplevel ) ;
static bool pgfdw_abort_cleanup_begin ( ConnCacheEntry * entry , bool toplevel ,
List * * pending_entries ,
List * * cancel_requested ) ;
static void pgfdw_finish_pre_commit_cleanup ( List * pending_entries ) ;
static void pgfdw_finish_pre_subcommit_cleanup ( List * pending_entries ,
int curlevel ) ;
static void pgfdw_finish_abort_cleanup ( List * pending_entries ,
List * cancel_requested ,
bool toplevel ) ;
static bool UserMappingPasswordRequired ( UserMapping * user ) ;
static bool disconnect_cached_connections ( Oid serverid ) ;
@ -320,8 +354,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*
* By default , all the connections to any foreign servers are kept open .
*
* Also determine whether to commit ( sub ) transactions opened on the remot e
* server in parallel at ( sub ) transaction end , which is disabled by
* Also determine whether to commit / abort ( sub ) transactions opened on the
* remote server in parallel at ( sub ) transaction end , which is disabled by
* default .
*
* Note : it ' s enough to determine these only when making a new connection
@ -330,6 +364,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*/
entry - > keep_connections = true ;
entry - > parallel_commit = false ;
entry - > parallel_abort = false ;
foreach ( lc , server - > options )
{
DefElem * def = ( DefElem * ) lfirst ( lc ) ;
@ -338,6 +373,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry - > keep_connections = defGetBoolean ( def ) ;
else if ( strcmp ( def - > defname , " parallel_commit " ) = = 0 )
entry - > parallel_commit = defGetBoolean ( def ) ;
else if ( strcmp ( def - > defname , " parallel_abort " ) = = 0 )
entry - > parallel_abort = defGetBoolean ( def ) ;
}
/* Now try to make the connection */
@ -892,6 +929,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
HASH_SEQ_STATUS scan ;
ConnCacheEntry * entry ;
List * pending_entries = NIL ;
List * cancel_requested = NIL ;
/* Quick exit if no connections were touched in this transaction. */
if ( ! xact_got_connection )
@ -985,7 +1023,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT :
case XACT_EVENT_ABORT :
/* Rollback all remote transactions during abort */
pgfdw_abort_cleanup ( entry , true ) ;
if ( entry - > parallel_abort )
{
if ( pgfdw_abort_cleanup_begin ( entry , true ,
& pending_entries ,
& cancel_requested ) )
continue ;
}
else
pgfdw_abort_cleanup ( entry , true ) ;
break ;
}
}
@ -995,11 +1041,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* If there are any pending connections, finish cleaning them up */
if ( pending_entries )
if ( pending_entries | | cancel_requested )
{
Assert ( event = = XACT_EVENT_PARALLEL_PRE_COMMIT | |
event = = XACT_EVENT_PRE_COMMIT ) ;
pgfdw_finish_pre_commit_cleanup ( pending_entries ) ;
if ( event = = XACT_EVENT_PARALLEL_PRE_COMMIT | |
event = = XACT_EVENT_PRE_COMMIT )
{
Assert ( cancel_requested = = NIL ) ;
pgfdw_finish_pre_commit_cleanup ( pending_entries ) ;
}
else
{
Assert ( event = = XACT_EVENT_PARALLEL_ABORT | |
event = = XACT_EVENT_ABORT ) ;
pgfdw_finish_abort_cleanup ( pending_entries , cancel_requested ,
true ) ;
}
}
/*
@ -1024,6 +1080,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry * entry ;
int curlevel ;
List * pending_entries = NIL ;
List * cancel_requested = NIL ;
/* Nothing to do at subxact start, nor after commit. */
if ( ! ( event = = SUBXACT_EVENT_PRE_COMMIT_SUB | |
@ -1078,7 +1135,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
else
{
/* Rollback all remote subtransactions during abort */
pgfdw_abort_cleanup ( entry , false ) ;
if ( entry - > parallel_abort )
{
if ( pgfdw_abort_cleanup_begin ( entry , false ,
& pending_entries ,
& cancel_requested ) )
continue ;
}
else
pgfdw_abort_cleanup ( entry , false ) ;
}
/* OK, we're outta that level of subtransaction */
@ -1086,10 +1151,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* If there are any pending connections, finish cleaning them up */
if ( pending_entries )
if ( pending_entries | | cancel_requested )
{
Assert ( event = = SUBXACT_EVENT_PRE_COMMIT_SUB ) ;
pgfdw_finish_pre_subcommit_cleanup ( pending_entries , curlevel ) ;
if ( event = = SUBXACT_EVENT_PRE_COMMIT_SUB )
{
Assert ( cancel_requested = = NIL ) ;
pgfdw_finish_pre_subcommit_cleanup ( pending_entries , curlevel ) ;
}
else
{
Assert ( event = = SUBXACT_EVENT_ABORT_SUB ) ;
pgfdw_finish_abort_cleanup ( pending_entries , cancel_requested ,
false ) ;
}
}
}
@ -1233,17 +1307,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query ( PGconn * conn )
{
PGcancel * cancel ;
char errbuf [ 256 ] ;
PGresult * result = NULL ;
TimestampTz endtime ;
bool timed_out ;
/*
* If it takes too long to cancel the query and discard the result , assume
* the connection is dead .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) , 30000 ) ;
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
if ( ! pgfdw_cancel_query_begin ( conn ) )
return false ;
return pgfdw_cancel_query_end ( conn , endtime , false ) ;
}
static bool
pgfdw_cancel_query_begin ( PGconn * conn )
{
PGcancel * cancel ;
char errbuf [ 256 ] ;
/*
* Issue cancel request . Unfortunately , there ' s no good way to limit the
@ -1263,6 +1345,30 @@ pgfdw_cancel_query(PGconn *conn)
PQfreeCancel ( cancel ) ;
}
return true ;
}
static bool
pgfdw_cancel_query_end ( PGconn * conn , TimestampTz endtime , bool consume_input )
{
PGresult * result = NULL ;
bool timed_out ;
/*
* If requested , consume whatever data is available from the socket . ( Note
* that if all data is available , this allows pgfdw_get_cleanup_result to
* call PQgetResult without forcing the overhead of WaitLatchOrSocket ,
* which would be large compared to the overhead of PQconsumeInput . )
*/
if ( consume_input & & ! PQconsumeInput ( conn ) )
{
ereport ( WARNING ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not get result of cancel request: %s " ,
pchomp ( PQerrorMessage ( conn ) ) ) ) ) ;
return false ;
}
/* Get and discard the result of the query. */
if ( pgfdw_get_cleanup_result ( conn , endtime , & result , & timed_out ) )
{
@ -1297,9 +1403,7 @@ pgfdw_cancel_query(PGconn *conn)
static bool
pgfdw_exec_cleanup_query ( PGconn * conn , const char * query , bool ignore_errors )
{
PGresult * result = NULL ;
TimestampTz endtime ;
bool timed_out ;
/*
* If it takes too long to execute a cleanup query , assume the connection
@ -1307,8 +1411,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
* place ( e . g . statement timeout , user cancel ) , so the timeout shouldn ' t
* be too long .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) , 30000 ) ;
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
if ( ! pgfdw_exec_cleanup_query_begin ( conn , query ) )
return false ;
return pgfdw_exec_cleanup_query_end ( conn , query , endtime ,
false , ignore_errors ) ;
}
static bool
pgfdw_exec_cleanup_query_begin ( PGconn * conn , const char * query )
{
/*
* Submit a query . Since we don ' t use non - blocking mode , this also can
* block . But its risk is relatively small , so we ignore that for now .
@ -1319,6 +1433,29 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
return false ;
}
return true ;
}
static bool
pgfdw_exec_cleanup_query_end ( PGconn * conn , const char * query ,
TimestampTz endtime , bool consume_input ,
bool ignore_errors )
{
PGresult * result = NULL ;
bool timed_out ;
/*
* If requested , consume whatever data is available from the socket . ( Note
* that if all data is available , this allows pgfdw_get_cleanup_result to
* call PQgetResult without forcing the overhead of WaitLatchOrSocket ,
* which would be large compared to the overhead of PQconsumeInput . )
*/
if ( consume_input & & ! PQconsumeInput ( conn ) )
{
pgfdw_report_error ( WARNING , NULL , conn , false , query ) ;
return false ;
}
/* Get the result of the query. */
if ( pgfdw_get_cleanup_result ( conn , endtime , & result , & timed_out ) )
{
@ -1474,12 +1611,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
! pgfdw_cancel_query ( entry - > conn ) )
return ; /* Unable to cancel running query */
if ( toplevel )
snprintf ( sql , sizeof ( sql ) , " ABORT TRANSACTION " ) ;
else
snprintf ( sql , sizeof ( sql ) ,
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " ,
entry - > xact_depth , entry - > xact_depth ) ;
CONSTRUCT_ABORT_COMMAND ( sql , entry , toplevel ) ;
if ( ! pgfdw_exec_cleanup_query ( entry - > conn , sql , false ) )
return ; /* Unable to abort remote (sub)transaction */
@ -1508,6 +1640,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
entry - > changing_xact_state = false ;
}
/*
* Like pgfdw_abort_cleanup , submit an abort command or cancel request , but
* don ' t wait for the result .
*
* Returns true if the abort command or cancel request is successfully issued ,
* false otherwise . If the abort command is successfully issued , the given
* connection cache entry is appended to * pending_entries . Othewise , if the
* cancel request is successfully issued , it is appended to * cancel_requested .
*/
static bool
pgfdw_abort_cleanup_begin ( ConnCacheEntry * entry , bool toplevel ,
List * * pending_entries , List * * cancel_requested )
{
/*
* Don ' t try to clean up the connection if we ' re already in error
* recursion trouble .
*/
if ( in_error_recursion_trouble ( ) )
entry - > changing_xact_state = true ;
/*
* If connection is already unsalvageable , don ' t touch it further .
*/
if ( entry - > changing_xact_state )
return false ;
/*
* Mark this connection as in the process of changing transaction state .
*/
entry - > changing_xact_state = true ;
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
/*
* If a command has been submitted to the remote server by using an
* asynchronous execution function , the command might not have yet
* completed . Check to see if a command is still being processed by the
* remote server , and if so , request cancellation of the command .
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE )
{
if ( ! pgfdw_cancel_query_begin ( entry - > conn ) )
return false ; /* Unable to cancel running query */
* cancel_requested = lappend ( * cancel_requested , entry ) ;
}
else
{
char sql [ 100 ] ;
CONSTRUCT_ABORT_COMMAND ( sql , entry , toplevel ) ;
if ( ! pgfdw_exec_cleanup_query_begin ( entry - > conn , sql ) )
return false ; /* Unable to abort remote transaction */
* pending_entries = lappend ( * pending_entries , entry ) ;
}
return true ;
}
/*
* Finish pre - commit cleanup of connections on each of which we ' ve sent a
* COMMIT command to the remote server .
@ -1616,6 +1807,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
}
}
/*
* Finish abort cleanup of connections on each of which we ' ve sent an abort
* command or cancel request to the remote server .
*/
static void
pgfdw_finish_abort_cleanup ( List * pending_entries , List * cancel_requested ,
bool toplevel )
{
List * pending_deallocs = NIL ;
ListCell * lc ;
/*
* For each of the pending cancel requests ( if any ) , get and discard the
* result of the query , and submit an abort command to the remote server .
*/
if ( cancel_requested )
{
foreach ( lc , cancel_requested )
{
ConnCacheEntry * entry = ( ConnCacheEntry * ) lfirst ( lc ) ;
TimestampTz endtime ;
char sql [ 100 ] ;
Assert ( entry - > changing_xact_state ) ;
/*
* Set end time . You might think we should do this before issuing
* cancel request like in normal mode , but that is problematic ,
* because if , for example , it took longer than 30 seconds to
* process the first few entries in the cancel_requested list , it
* would cause a timeout error when processing each of the
* remaining entries in the list , leading to slamming that entry ' s
* connection shut .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
if ( ! pgfdw_cancel_query_end ( entry - > conn , endtime , true ) )
{
/* Unable to cancel running query */
pgfdw_reset_xact_state ( entry , toplevel ) ;
continue ;
}
/* Send an abort command in parallel if needed */
CONSTRUCT_ABORT_COMMAND ( sql , entry , toplevel ) ;
if ( ! pgfdw_exec_cleanup_query_begin ( entry - > conn , sql ) )
{
/* Unable to abort remote (sub)transaction */
pgfdw_reset_xact_state ( entry , toplevel ) ;
}
else
pending_entries = lappend ( pending_entries , entry ) ;
}
}
/* No further work if no pending entries */
if ( ! pending_entries )
return ;
/*
* Get the result of the abort command for each of the pending entries
*/
foreach ( lc , pending_entries )
{
ConnCacheEntry * entry = ( ConnCacheEntry * ) lfirst ( lc ) ;
TimestampTz endtime ;
char sql [ 100 ] ;
Assert ( entry - > changing_xact_state ) ;
/*
* Set end time . We do this now , not before issuing the command like
* in normal mode , for the same reason as for the cancel_requested
* entries .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
CONSTRUCT_ABORT_COMMAND ( sql , entry , toplevel ) ;
if ( ! pgfdw_exec_cleanup_query_end ( entry - > conn , sql , endtime ,
true , false ) )
{
/* Unable to abort remote (sub)transaction */
pgfdw_reset_xact_state ( entry , toplevel ) ;
continue ;
}
if ( toplevel )
{
/* Do a DEALLOCATE ALL in parallel if needed */
if ( entry - > have_prep_stmt & & entry - > have_error )
{
if ( ! pgfdw_exec_cleanup_query_begin ( entry - > conn ,
" DEALLOCATE ALL " ) )
{
/* Trouble clearing prepared statements */
pgfdw_reset_xact_state ( entry , toplevel ) ;
}
else
pending_deallocs = lappend ( pending_deallocs , entry ) ;
continue ;
}
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
}
/* Reset the per-connection state if needed */
if ( entry - > state . pendingAreq )
memset ( & entry - > state , 0 , sizeof ( entry - > state ) ) ;
/* We're done with this entry; unset the changing_xact_state flag */
entry - > changing_xact_state = false ;
pgfdw_reset_xact_state ( entry , toplevel ) ;
}
/* No further work if no pending entries */
if ( ! pending_deallocs )
return ;
Assert ( toplevel ) ;
/*
* Get the result of the DEALLOCATE command for each of the pending
* entries
*/
foreach ( lc , pending_deallocs )
{
ConnCacheEntry * entry = ( ConnCacheEntry * ) lfirst ( lc ) ;
TimestampTz endtime ;
Assert ( entry - > changing_xact_state ) ;
Assert ( entry - > have_prep_stmt ) ;
Assert ( entry - > have_error ) ;
/*
* Set end time . We do this now , not before issuing the command like
* in normal mode , for the same reason as for the cancel_requested
* entries .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) ,
CONNECTION_CLEANUP_TIMEOUT ) ;
if ( ! pgfdw_exec_cleanup_query_end ( entry - > conn , " DEALLOCATE ALL " ,
endtime , true , true ) )
{
/* Trouble clearing prepared statements */
pgfdw_reset_xact_state ( entry , toplevel ) ;
continue ;
}
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
/* Reset the per-connection state if needed */
if ( entry - > state . pendingAreq )
memset ( & entry - > state , 0 , sizeof ( entry - > state ) ) ;
/* We're done with this entry; unset the changing_xact_state flag */
entry - > changing_xact_state = false ;
pgfdw_reset_xact_state ( entry , toplevel ) ;
}
}
/*
* List active foreign server connections .
*