@ -105,6 +105,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors ) ;
bool ignore_errors ) ;
static bool pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
static bool pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
PGresult * * result ) ;
PGresult * * result ) ;
static void pgfdw_abort_cleanup ( ConnCacheEntry * entry , const char * sql ,
bool toplevel ) ;
static bool UserMappingPasswordRequired ( UserMapping * user ) ;
static bool UserMappingPasswordRequired ( UserMapping * user ) ;
static bool disconnect_cached_connections ( Oid serverid ) ;
static bool disconnect_cached_connections ( Oid serverid ) ;
@ -872,8 +874,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* If it has an open remote transaction, try to close it */
/* If it has an open remote transaction, try to close it */
if ( entry - > xact_depth > 0 )
if ( entry - > xact_depth > 0 )
{
{
bool abort_cleanup_failure = false ;
elog ( DEBUG3 , " closing remote transaction on connection %p " ,
elog ( DEBUG3 , " closing remote transaction on connection %p " ,
entry - > conn ) ;
entry - > conn ) ;
@ -940,67 +940,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT :
case XACT_EVENT_PARALLEL_ABORT :
case XACT_EVENT_ABORT :
case XACT_EVENT_ABORT :
/*
pgfdw_abort_cleanup ( entry , " ABORT TRANSACTION " , true ) ;
* Don ' t try to clean up the connection if we ' re already
* in error recursion trouble .
*/
if ( in_error_recursion_trouble ( ) )
entry - > changing_xact_state = true ;
/*
* If connection is already unsalvageable , don ' t touch it
* further .
*/
if ( entry - > changing_xact_state )
break ;
/*
* Mark this connection as in the process of changing
* transaction state .
*/
entry - > changing_xact_state = true ;
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
/*
* If a command has been submitted to the remote server by
* using an asynchronous execution function , the command
* might not have yet completed . Check to see if a
* command is still being processed by the remote server ,
* and if so , request cancellation of the command .
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE & &
! pgfdw_cancel_query ( entry - > conn ) )
{
/* Unable to cancel running query. */
abort_cleanup_failure = true ;
}
else if ( ! pgfdw_exec_cleanup_query ( entry - > conn ,
" ABORT TRANSACTION " ,
false ) )
{
/* Unable to abort remote transaction. */
abort_cleanup_failure = true ;
}
else if ( entry - > have_prep_stmt & & entry - > have_error & &
! pgfdw_exec_cleanup_query ( entry - > conn ,
" DEALLOCATE ALL " ,
true ) )
{
/* Trouble clearing prepared statements. */
abort_cleanup_failure = true ;
}
else
{
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
/* Also reset per-connection state */
memset ( & entry - > state , 0 , sizeof ( entry - > state ) ) ;
}
/* Disarm changing_xact_state if it all worked. */
entry - > changing_xact_state = abort_cleanup_failure ;
break ;
break ;
}
}
}
}
@ -1091,46 +1031,13 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
entry - > changing_xact_state = false ;
entry - > changing_xact_state = false ;
}
}
else if ( in_error_recursion_trouble ( ) )
else
{
/*
* Don ' t try to clean up the connection if we ' re already in error
* recursion trouble .
*/
entry - > changing_xact_state = true ;
}
else if ( ! entry - > changing_xact_state )
{
{
bool abort_cleanup_failure = false ;
/* Rollback all remote subtransactions during abort */
snprintf ( sql , sizeof ( sql ) ,
/* Remember that abort cleanup is in progress. */
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " ,
entry - > changing_xact_state = true ;
curlevel , curlevel ) ;
pgfdw_abort_cleanup ( entry , sql , false ) ;
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
/*
* If a command has been submitted to the remote server by using
* an asynchronous execution function , the command might not have
* yet completed . Check to see if a command is still being
* processed by the remote server , and if so , request cancellation
* of the command .
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE & &
! pgfdw_cancel_query ( entry - > conn ) )
abort_cleanup_failure = true ;
else
{
/* Rollback all remote subtransactions during abort */
snprintf ( sql , sizeof ( sql ) ,
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " ,
curlevel , curlevel ) ;
if ( ! pgfdw_exec_cleanup_query ( entry - > conn , sql , false ) )
abort_cleanup_failure = true ;
}
/* Disarm changing_xact_state if it all worked. */
entry - > changing_xact_state = abort_cleanup_failure ;
}
}
/* OK, we're outta that level of subtransaction */
/* OK, we're outta that level of subtransaction */
@ -1409,6 +1316,72 @@ exit: ;
return timed_out ;
return timed_out ;
}
}
/*
* Abort remote transaction .
*
* The statement specified in " sql " is sent to the remote server ,
* in order to rollback the remote transaction .
*
* " toplevel " should be set to true if toplevel ( main ) transaction is
* rollbacked , false otherwise .
*
* Set entry - > changing_xact_state to false on success , true on failure .
*/
static void
pgfdw_abort_cleanup ( ConnCacheEntry * entry , const char * sql , bool toplevel )
{
/*
* Don ' t try to clean up the connection if we ' re already in error
* recursion trouble .
*/
if ( in_error_recursion_trouble ( ) )
entry - > changing_xact_state = true ;
/*
* If connection is already unsalvageable , don ' t touch it further .
*/
if ( entry - > changing_xact_state )
return ;
/*
* Mark this connection as in the process of changing transaction state .
*/
entry - > changing_xact_state = true ;
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
/*
* If a command has been submitted to the remote server by using an
* asynchronous execution function , the command might not have yet
* completed . Check to see if a command is still being processed by the
* remote server , and if so , request cancellation of the command .
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE & &
! pgfdw_cancel_query ( entry - > conn ) )
return ; /* Unable to cancel running query */
if ( ! pgfdw_exec_cleanup_query ( entry - > conn , sql , false ) )
return ; /* Unable to abort remote transaction */
if ( toplevel )
{
if ( entry - > have_prep_stmt & & entry - > have_error & &
! pgfdw_exec_cleanup_query ( entry - > conn ,
" DEALLOCATE ALL " ,
true ) )
return ; /* Trouble clearing prepared statements */
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
/* Also reset per-connection state */
memset ( & entry - > state , 0 , sizeof ( entry - > state ) ) ;
}
/* Disarm changing_xact_state if it all worked */
entry - > changing_xact_state = false ;
}
/*
/*
* List active foreign server connections .
* List active foreign server connections .
*
*