@ -14,6 +14,8 @@
# include "postgres_fdw.h"
# include "postgres_fdw.h"
# include "access/htup_details.h"
# include "catalog/pg_user_mapping.h"
# include "access/xact.h"
# include "access/xact.h"
# include "mb/pg_wchar.h"
# include "mb/pg_wchar.h"
# include "miscadmin.h"
# include "miscadmin.h"
@ -21,6 +23,7 @@
# include "storage/latch.h"
# include "storage/latch.h"
# include "utils/hsearch.h"
# include "utils/hsearch.h"
# include "utils/memutils.h"
# include "utils/memutils.h"
# include "utils/syscache.h"
/*
/*
@ -49,6 +52,7 @@ typedef struct ConnCacheEntry
* one level of subxact open , etc */
* one level of subxact open , etc */
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
bool have_error ; /* have any subxacts aborted in this xact? */
bool have_error ; /* have any subxacts aborted in this xact? */
bool changing_xact_state ; /* xact state change in process */
} ConnCacheEntry ;
} ConnCacheEntry ;
/*
/*
@ -74,6 +78,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid ,
SubTransactionId mySubid ,
SubTransactionId parentSubid ,
SubTransactionId parentSubid ,
void * arg ) ;
void * arg ) ;
static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry * entry ) ;
static bool pgfdw_cancel_query ( PGconn * conn ) ;
static bool pgfdw_exec_cleanup_query ( PGconn * conn , const char * query ,
bool ignore_errors ) ;
static bool pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime ,
PGresult * * result ) ;
/*
/*
@ -139,8 +149,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry - > xact_depth = 0 ;
entry - > xact_depth = 0 ;
entry - > have_prep_stmt = false ;
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
entry - > have_error = false ;
entry - > changing_xact_state = false ;
}
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change ( entry ) ;
/*
/*
* We don ' t check the health of cached connection here , because it would
* We don ' t check the health of cached connection here , because it would
* require some overhead . Broken connection will be detected when the
* require some overhead . Broken connection will be detected when the
@ -343,7 +357,9 @@ do_sql_command(PGconn *conn, const char *sql)
{
{
PGresult * res ;
PGresult * res ;
res = PQexec ( conn , sql ) ;
if ( ! PQsendQuery ( conn , sql ) )
pgfdw_report_error ( ERROR , NULL , conn , false , sql ) ;
res = pgfdw_get_result ( conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , conn , true , sql ) ;
pgfdw_report_error ( ERROR , res , conn , true , sql ) ;
PQclear ( res ) ;
PQclear ( res ) ;
@ -376,8 +392,10 @@ begin_remote_xact(ConnCacheEntry *entry)
sql = " START TRANSACTION ISOLATION LEVEL SERIALIZABLE " ;
sql = " START TRANSACTION ISOLATION LEVEL SERIALIZABLE " ;
else
else
sql = " START TRANSACTION ISOLATION LEVEL REPEATABLE READ " ;
sql = " START TRANSACTION ISOLATION LEVEL REPEATABLE READ " ;
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
entry - > xact_depth = 1 ;
entry - > xact_depth = 1 ;
entry - > changing_xact_state = false ;
}
}
/*
/*
@ -390,8 +408,10 @@ begin_remote_xact(ConnCacheEntry *entry)
char sql [ 64 ] ;
char sql [ 64 ] ;
snprintf ( sql , sizeof ( sql ) , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
snprintf ( sql , sizeof ( sql ) , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
entry - > xact_depth + + ;
entry - > xact_depth + + ;
entry - > changing_xact_state = false ;
}
}
}
}
@ -604,6 +624,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* If it has an open remote transaction, try to close it */
/* If it has an open remote transaction, try to close it */
if ( entry - > xact_depth > 0 )
if ( entry - > xact_depth > 0 )
{
{
bool abort_cleanup_failure = false ;
elog ( DEBUG3 , " closing remote transaction on connection %p " ,
elog ( DEBUG3 , " closing remote transaction on connection %p " ,
entry - > conn ) ;
entry - > conn ) ;
@ -611,8 +633,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
{
case XACT_EVENT_PARALLEL_PRE_COMMIT :
case XACT_EVENT_PARALLEL_PRE_COMMIT :
case XACT_EVENT_PRE_COMMIT :
case XACT_EVENT_PRE_COMMIT :
/*
* If abort cleanup previously failed for this connection ,
* we can ' t issue any more commands against it .
*/
pgfdw_reject_incomplete_xact_state_change ( entry ) ;
/* Commit all remote transactions during pre-commit */
/* Commit all remote transactions during pre-commit */
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , " COMMIT TRANSACTION " ) ;
do_sql_command ( entry - > conn , " COMMIT TRANSACTION " ) ;
entry - > changing_xact_state = false ;
/*
/*
* If there were any errors in subtransactions , and we
* If there were any errors in subtransactions , and we
@ -660,6 +691,27 @@ pgfdw_xact_callback(XactEvent event, void *arg)
break ;
break ;
case XACT_EVENT_PARALLEL_ABORT :
case XACT_EVENT_PARALLEL_ABORT :
case XACT_EVENT_ABORT :
case XACT_EVENT_ABORT :
/*
* Don ' t try to clean up the connection if we ' re already
* in error recursion trouble .
*/
if ( in_error_recursion_trouble ( ) )
entry - > changing_xact_state = true ;
/*
* If connection is already unsalvageable , don ' t touch it
* further .
*/
if ( entry - > changing_xact_state )
break ;
/*
* Mark this connection as in the process of changing
* transaction state .
*/
entry - > changing_xact_state = true ;
/* Assume we might have lost track of prepared statements */
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
entry - > have_error = true ;
@ -670,40 +722,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
* command is still being processed by the remote server ,
* command is still being processed by the remote server ,
* and if so , request cancellation of the command .
* and if so , request cancellation of the command .
*/
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE )
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE & &
! pgfdw_cancel_query ( entry - > conn ) )
{
{
PGcancel * cancel ;
/* Unable to cancel running query. */
char errbuf [ 256 ] ;
abort_cleanup_failure = true ;
}
if ( ( cancel = PQgetCancel ( entry - > conn ) ) )
else if ( ! pgfdw_exec_cleanup_query ( entry - > conn ,
" ABORT TRANSACTION " ,
false ) )
{
{
if ( ! PQcancel ( cancel , errbuf , sizeof ( errbuf ) ) )
/* Unable to abort remote transaction. */
ereport ( WARNING ,
abort_cleanup_failure = true ;
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not send cancel request: %s " ,
errbuf ) ) ) ;
PQfreeCancel ( cancel ) ;
}
}
else if ( entry - > have_prep_stmt & & entry - > have_error & &
! pgfdw_exec_cleanup_query ( entry - > conn ,
" DEALLOCATE ALL " ,
true ) )
{
/* Trouble clearing prepared statements. */
abort_cleanup_failure = true ;
}
}
/* If we're aborting, abort all remote transactions too */
res = PQexec ( entry - > conn , " ABORT TRANSACTION " ) ;
/* Note: can't throw ERROR, it would be infinite loop */
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( WARNING , res , entry - > conn , true ,
" ABORT TRANSACTION " ) ;
else
else
{
{
PQclear ( res ) ;
/* As above, make sure to clear any prepared stmts */
if ( entry - > have_prep_stmt & & entry - > have_error )
{
res = PQexec ( entry - > conn , " DEALLOCATE ALL " ) ;
PQclear ( res ) ;
}
entry - > have_prep_stmt = false ;
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
entry - > have_error = false ;
}
}
/* Disarm changing_xact_state if it all worked. */
entry - > changing_xact_state = abort_cleanup_failure ;
break ;
break ;
}
}
}
}
@ -716,11 +763,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
* recover . Next GetConnection will open a new connection .
* recover . Next GetConnection will open a new connection .
*/
*/
if ( PQstatus ( entry - > conn ) ! = CONNECTION_OK | |
if ( PQstatus ( entry - > conn ) ! = CONNECTION_OK | |
PQtransactionStatus ( entry - > conn ) ! = PQTRANS_IDLE )
PQtransactionStatus ( entry - > conn ) ! = PQTRANS_IDLE | |
entry - > changing_xact_state )
{
{
elog ( DEBUG3 , " discarding connection %p " , entry - > conn ) ;
elog ( DEBUG3 , " discarding connection %p " , entry - > conn ) ;
PQfinish ( entry - > conn ) ;
PQfinish ( entry - > conn ) ;
entry - > conn = NULL ;
entry - > conn = NULL ;
entry - > changing_xact_state = false ;
}
}
}
}
@ -763,7 +812,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
hash_seq_init ( & scan , ConnectionHash ) ;
hash_seq_init ( & scan , ConnectionHash ) ;
while ( ( entry = ( ConnCacheEntry * ) hash_seq_search ( & scan ) ) )
while ( ( entry = ( ConnCacheEntry * ) hash_seq_search ( & scan ) ) )
{
{
PGresult * res ;
char sql [ 100 ] ;
char sql [ 100 ] ;
/*
/*
@ -779,12 +827,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if ( event = = SUBXACT_EVENT_PRE_COMMIT_SUB )
if ( event = = SUBXACT_EVENT_PRE_COMMIT_SUB )
{
{
/*
* If abort cleanup previously failed for this connection , we
* can ' t issue any more commands against it .
*/
pgfdw_reject_incomplete_xact_state_change ( entry ) ;
/* Commit all remote subtransactions during pre-commit */
/* Commit all remote subtransactions during pre-commit */
snprintf ( sql , sizeof ( sql ) , " RELEASE SAVEPOINT s%d " , curlevel ) ;
snprintf ( sql , sizeof ( sql ) , " RELEASE SAVEPOINT s%d " , curlevel ) ;
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
entry - > changing_xact_state = false ;
}
}
else
else if ( in_error_recursion_trouble ( ) )
{
/*
* Don ' t try to clean up the connection if we ' re already in error
* recursion trouble .
*/
entry - > changing_xact_state = true ;
}
else if ( ! entry - > changing_xact_state )
{
{
bool abort_cleanup_failure = false ;
/* Remember that abort cleanup is in progress. */
entry - > changing_xact_state = true ;
/* Assume we might have lost track of prepared statements */
/* Assume we might have lost track of prepared statements */
entry - > have_error = true ;
entry - > have_error = true ;
@ -795,34 +864,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
* processed by the remote server , and if so , request cancellation
* processed by the remote server , and if so , request cancellation
* of the command .
* of the command .
*/
*/
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE )
if ( PQtransactionStatus ( entry - > conn ) = = PQTRANS_ACTIVE & &
! pgfdw_cancel_query ( entry - > conn ) )
abort_cleanup_failure = true ;
else
{
{
/* Rollback all remote subtransactions during abort */
snprintf ( sql , sizeof ( sql ) ,
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " ,
curlevel , curlevel ) ;
if ( ! pgfdw_exec_cleanup_query ( entry - > conn , sql , false ) )
abort_cleanup_failure = true ;
}
/* Disarm changing_xact_state if it all worked. */
entry - > changing_xact_state = abort_cleanup_failure ;
}
/* OK, we're outta that level of subtransaction */
entry - > xact_depth - - ;
}
}
/*
* Raise an error if the given connection cache entry is marked as being
* in the middle of an xact state change . This should be called at which no
* such change is expected to be in progress ; if one is found to be in
* progress , it means that we aborted in the middle of a previous state change
* and now don ' t know what the remote transaction state actually is .
* Such connections can ' t safely be further used . Re - establishing the
* connection would change the snapshot and roll back any writes already
* performed , so that ' s not an option , either . Thus , we must abort .
*/
static void
pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry * entry )
{
HeapTuple tup ;
Form_pg_user_mapping umform ;
ForeignServer * server ;
if ( ! entry - > changing_xact_state )
return ;
tup = SearchSysCache1 ( USERMAPPINGOID ,
ObjectIdGetDatum ( entry - > key ) ) ;
if ( ! HeapTupleIsValid ( tup ) )
elog ( ERROR , " cache lookup failed for user mapping %u " , entry - > key ) ;
umform = ( Form_pg_user_mapping ) GETSTRUCT ( tup ) ;
server = GetForeignServer ( umform - > umserver ) ;
ReleaseSysCache ( tup ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_EXCEPTION ) ,
errmsg ( " connection to server \" %s \" was lost " ,
server - > servername ) ) ) ;
}
/*
* Cancel the currently - in - progress query ( whose query text we do not have )
* and ignore the result . Returns true if we successfully cancel the query
* and discard any pending result , and false if not .
*/
static bool
pgfdw_cancel_query ( PGconn * conn )
{
PGcancel * cancel ;
PGcancel * cancel ;
char errbuf [ 256 ] ;
char errbuf [ 256 ] ;
PGresult * result = NULL ;
TimestampTz endtime ;
/*
* If it takes too long to cancel the query and discard the result , assume
* the connection is dead .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) , 30000 ) ;
if ( ( cancel = PQgetCancel ( entry - > conn ) ) )
/*
* Issue cancel request . Unfortunately , there ' s no good way to limit the
* amount of time that we might block inside PQgetCancel ( ) .
*/
if ( ( cancel = PQgetCancel ( conn ) ) )
{
{
if ( ! PQcancel ( cancel , errbuf , sizeof ( errbuf ) ) )
if ( ! PQcancel ( cancel , errbuf , sizeof ( errbuf ) ) )
{
ereport ( WARNING ,
ereport ( WARNING ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " could not send cancel request: %s " ,
errmsg ( " could not send cancel request: %s " ,
errbuf ) ) ) ;
errbuf ) ) ) ;
PQfreeCancel ( cancel ) ;
PQfreeCancel ( cancel ) ;
return false ;
}
}
PQfreeCancel ( cancel ) ;
}
}
/* Rollback all remote subtransactions during abort */
/* Get and discard the result of the query. */
snprintf ( sql , sizeof ( sql ) ,
if ( pgfdw_get_cleanup_result ( conn , endtime , & result ) )
" ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d " ,
return false ;
curlevel , curlevel ) ;
PQclear ( result ) ;
res = PQexec ( entry - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
return true ;
pgfdw_report_error ( WARNING , res , entry - > conn , true , sql ) ;
}
else
PQclear ( res ) ;
/*
* Submit a query during ( sub ) abort cleanup and wait up to 30 seconds for the
* result . If the query is executed without error , the return value is true .
* If the query is executed successfully but returns an error , the return
* value is true if and only if ignore_errors is set . If the query can ' t be
* sent or times out , the return value is false .
*/
static bool
pgfdw_exec_cleanup_query ( PGconn * conn , const char * query , bool ignore_errors )
{
PGresult * result = NULL ;
TimestampTz endtime ;
/*
* If it takes too long to execute a cleanup query , assume the connection
* is dead . It ' s fairly likely that this is why we aborted in the first
* place ( e . g . statement timeout , user cancel ) , so the timeout shouldn ' t
* be too long .
*/
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) , 30000 ) ;
/*
* Submit a query . Since we don ' t use non - blocking mode , this also can
* block . But its risk is relatively small , so we ignore that for now .
*/
if ( ! PQsendQuery ( conn , query ) )
{
pgfdw_report_error ( WARNING , NULL , conn , false , query ) ;
return false ;
}
}
/* OK, we're outta that level of subtransaction */
/* Get the result of the query. */
entry - > xact_depth - - ;
if ( pgfdw_get_cleanup_result ( conn , endtime , & result ) )
return false ;
/* Issue a warning if not successful. */
if ( PQresultStatus ( result ) ! = PGRES_COMMAND_OK )
{
pgfdw_report_error ( WARNING , result , conn , true , query ) ;
return ignore_errors ;
}
}
return true ;
}
/*
* Get , during abort cleanup , the result of a query that is in progress . This
* might be a query that is being interrupted by transaction abort , or it might
* be a query that was initiated as part of transaction abort to get the remote
* side back to the appropriate state .
*
* It ' s not a huge problem if we throw an ERROR here , but if we get into error
* recursion trouble , we ' ll end up slamming the connection shut , which will
* necessitate failing the entire toplevel transaction even if subtransactions
* were used . Try to use WARNING where we can .
*
* endtime is the time at which we should give up and assume the remote
* side is dead . Returns true if the timeout expired , otherwise false .
* Sets * result except in case of a timeout .
*/
static bool
pgfdw_get_cleanup_result ( PGconn * conn , TimestampTz endtime , PGresult * * result )
{
PGresult * last_res = NULL ;
for ( ; ; )
{
PGresult * res ;
while ( PQisBusy ( conn ) )
{
int wc ;
TimestampTz now = GetCurrentTimestamp ( ) ;
long secs ;
int microsecs ;
long cur_timeout ;
/* If timeout has expired, give up, else get sleep time. */
if ( now > = endtime )
return true ;
TimestampDifference ( now , endtime , & secs , & microsecs ) ;
/* To protect against clock skew, limit sleep to one minute. */
cur_timeout = Min ( 60000 , secs * USECS_PER_SEC + microsecs ) ;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket ( MyLatch ,
WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT ,
PQsocket ( conn ) ,
cur_timeout , PG_WAIT_EXTENSION ) ;
ResetLatch ( MyLatch ) ;
CHECK_FOR_INTERRUPTS ( ) ;
/* Data available in socket */
if ( wc & WL_SOCKET_READABLE )
{
if ( ! PQconsumeInput ( conn ) )
{
* result = NULL ;
return false ;
}
}
}
res = PQgetResult ( conn ) ;
if ( res = = NULL )
break ; /* query is complete */
PQclear ( last_res ) ;
last_res = res ;
}
* result = last_res ;
return false ;
}
}