@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
/* Remaining fields are invalid when conn is NULL: */
int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open , etc */
bool xact_read_only ; /* xact r/o state */
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
bool have_error ; /* have any subxacts aborted in this xact? */
bool changing_xact_state ; /* xact state change in process */
@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false ;
/*
* tracks the nesting level of the topmost read - only transaction determined
* by GetTopReadOnlyTransactionNestLevel ( )
*/
static int top_read_only_level = 0 ;
/* custom wait event values, retrieved from shared memory */
static uint32 pgfdw_we_cleanup_result = 0 ;
static uint32 pgfdw_we_connect = 0 ;
@ -372,6 +379,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
/* Reset all transient state fields, to be sure all are clean */
entry - > xact_depth = 0 ;
entry - > xact_read_only = false ;
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
entry - > changing_xact_state = false ;
@ -843,29 +851,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
* those scans . A disadvantage is that we can ' t provide sane emulation of
* READ COMMITTED behavior - - - it would be nice if we had some other way to
* control which remote queries share a snapshot .
*
* Note also that we always start the remote transaction with the same
* read / write and deferrable properties as the local transaction , and start
* the remote subtransaction with the same read / write property as the local
* subtransaction .
*/
static void
begin_remote_xact ( ConnCacheEntry * entry )
{
int curlevel = GetCurrentTransactionNestLevel ( ) ;
/* Start main transaction if we haven't yet */
/*
* Set the nesting level of the topmost read - only transaction if the
* current transaction is read - only and we haven ' t yet . Once it ' s set ,
* it ' s retained until that transaction is committed / aborted , and then
* reset ( see pgfdw_xact_callback and pgfdw_subxact_callback ) .
*/
if ( XactReadOnly )
{
if ( top_read_only_level = = 0 )
top_read_only_level = GetTopReadOnlyTransactionNestLevel ( ) ;
Assert ( top_read_only_level > 0 ) ;
}
else
Assert ( top_read_only_level = = 0 ) ;
/*
* Start main transaction if we haven ' t yet ; otherwise , change the
* already - started remote transaction / subtransaction to read - only if the
* local transaction / subtransaction have been done so after starting them
* and we haven ' t yet .
*/
if ( entry - > xact_depth < = 0 )
{
const char * sql ;
StringInfoData sql ;
bool ro = ( top_read_only_level = = 1 ) ;
elog ( DEBUG3 , " starting remote transaction on connection %p " ,
entry - > conn ) ;
initStringInfo ( & sql ) ;
appendStringInfoString ( & sql , " START TRANSACTION ISOLATION LEVEL " ) ;
if ( IsolationIsSerializable ( ) )
sql = " START TRANSACTION ISOLATION LEVEL SERIALIZABLE " ;
appendStringInfoString ( & sql , " SERIALIZABLE " ) ;
else
sql = " START TRANSACTION ISOLATION LEVEL REPEATABLE READ " ;
appendStringInfoString ( & sql , " REPEATABLE READ " ) ;
if ( ro )
appendStringInfoString ( & sql , " READ ONLY " ) ;
if ( XactDeferrable )
appendStringInfoString ( & sql , " DEFERRABLE " ) ;
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql . data ) ;
entry - > xact_depth = 1 ;
if ( ro )
{
Assert ( ! entry - > xact_read_only ) ;
entry - > xact_read_only = true ;
}
entry - > changing_xact_state = false ;
}
else if ( ! entry - > xact_read_only )
{
Assert ( top_read_only_level = = 0 | |
entry - > xact_depth < = top_read_only_level ) ;
if ( entry - > xact_depth = = top_read_only_level )
{
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , " SET transaction_read_only = on " ) ;
entry - > xact_read_only = true ;
entry - > changing_xact_state = false ;
}
}
else
Assert ( top_read_only_level > 0 & &
entry - > xact_depth > = top_read_only_level ) ;
/*
* If we ' re in a subtransaction , stack up savepoints to match our level .
@ -874,12 +934,21 @@ begin_remote_xact(ConnCacheEntry *entry)
*/
while ( entry - > xact_depth < curlevel )
{
char sql [ 64 ] ;
StringInfoData sql ;
bool ro = ( entry - > xact_depth + 1 = = top_read_only_level ) ;
snprintf ( sql , sizeof ( sql ) , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
initStringInfo ( & sql ) ;
appendStringInfo ( & sql , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
if ( ro )
appendStringInfoString ( & sql , " ; SET transaction_read_only = on " ) ;
entry - > changing_xact_state = true ;
do_sql_command ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql . data ) ;
entry - > xact_depth + + ;
if ( ro )
{
Assert ( ! entry - > xact_read_only ) ;
entry - > xact_read_only = true ;
}
entry - > changing_xact_state = false ;
}
}
@ -1174,6 +1243,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Also reset cursor numbering for next transaction */
cursor_number = 0 ;
/* Likewise for top_read_only_level */
top_read_only_level = 0 ;
}
/*
@ -1272,6 +1344,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
false ) ;
}
}
/* If in the topmost read-only transaction, reset top_read_only_level */
if ( curlevel = = top_read_only_level )
top_read_only_level = 0 ;
}
/*
@ -1374,6 +1450,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
/* Reset state to show we're out of a transaction */
entry - > xact_depth = 0 ;
/* Reset xact r/o state */
entry - > xact_read_only = false ;
/*
* If the connection isn ' t in a good idle state , it is marked as
* invalid or keep_connections option of its server is disabled , then
@ -1394,6 +1473,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
{
/* Reset state to show we're out of a subtransaction */
entry - > xact_depth - - ;
/* If in the topmost read-only transaction, reset xact r/o state */
if ( entry - > xact_depth + 1 = = top_read_only_level )
entry - > xact_read_only = false ;
}
}