@ -67,6 +67,7 @@ static bool xact_got_connection = false;
static PGconn * connect_pg_server ( ForeignServer * server , UserMapping * user ) ;
static PGconn * connect_pg_server ( ForeignServer * server , UserMapping * user ) ;
static void check_conn_params ( const char * * keywords , const char * * values ) ;
static void check_conn_params ( const char * * keywords , const char * * values ) ;
static void configure_remote_session ( PGconn * conn ) ;
static void configure_remote_session ( PGconn * conn ) ;
static void do_sql_command ( PGconn * conn , const char * sql ) ;
static void begin_remote_xact ( ConnCacheEntry * entry ) ;
static void begin_remote_xact ( ConnCacheEntry * entry ) ;
static void pgfdw_xact_callback ( XactEvent event , void * arg ) ;
static void pgfdw_xact_callback ( XactEvent event , void * arg ) ;
static void pgfdw_subxact_callback ( SubXactEvent event ,
static void pgfdw_subxact_callback ( SubXactEvent event ,
@ -314,11 +315,43 @@ check_conn_params(const char **keywords, const char **values)
static void
static void
configure_remote_session ( PGconn * conn )
configure_remote_session ( PGconn * conn )
{
{
const char * sql ;
int remoteversion = PQserverVersion ( conn ) ;
PGresult * res ;
/* Force the search path to contain only pg_catalog (see deparse.c) */
/* Force the search path to contain only pg_catalog (see deparse.c) */
sql = " SET search_path = pg_catalog " ;
do_sql_command ( conn , " SET search_path = pg_catalog " ) ;
/*
* Set remote timezone ; this is basically just cosmetic , since all
* transmitted and returned timestamptzs should specify a zone explicitly
* anyway . However it makes the regression test outputs more predictable .
*
* We don ' t risk setting remote zone equal to ours , since the remote
* server might use a different timezone database .
*/
do_sql_command ( conn , " SET timezone = UTC " ) ;
/*
* Set values needed to ensure unambiguous data output from remote . ( This
* logic should match what pg_dump does . See also set_transmission_modes
* in postgres_fdw . c . )
*/
do_sql_command ( conn , " SET datestyle = ISO " ) ;
if ( remoteversion > = 80400 )
do_sql_command ( conn , " SET intervalstyle = postgres " ) ;
if ( remoteversion > = 90000 )
do_sql_command ( conn , " SET extra_float_digits = 3 " ) ;
else
do_sql_command ( conn , " SET extra_float_digits = 2 " ) ;
}
/*
* Convenience subroutine to issue a non - data - returning SQL command to remote
*/
static void
do_sql_command ( PGconn * conn , const char * sql )
{
PGresult * res ;
res = PQexec ( conn , sql ) ;
res = PQexec ( conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , sql ) ;
pgfdw_report_error ( ERROR , res , true , sql ) ;
@ -339,7 +372,6 @@ static void
begin_remote_xact ( ConnCacheEntry * entry )
begin_remote_xact ( ConnCacheEntry * entry )
{
{
int curlevel = GetCurrentTransactionNestLevel ( ) ;
int curlevel = GetCurrentTransactionNestLevel ( ) ;
PGresult * res ;
/* Start main transaction if we haven't yet */
/* Start main transaction if we haven't yet */
if ( entry - > xact_depth < = 0 )
if ( entry - > xact_depth < = 0 )
@ -353,10 +385,7 @@ begin_remote_xact(ConnCacheEntry *entry)
sql = " START TRANSACTION ISOLATION LEVEL SERIALIZABLE " ;
sql = " START TRANSACTION ISOLATION LEVEL SERIALIZABLE " ;
else
else
sql = " START TRANSACTION ISOLATION LEVEL REPEATABLE READ " ;
sql = " START TRANSACTION ISOLATION LEVEL REPEATABLE READ " ;
res = PQexec ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , sql ) ;
PQclear ( res ) ;
entry - > xact_depth = 1 ;
entry - > xact_depth = 1 ;
}
}
@ -370,10 +399,7 @@ begin_remote_xact(ConnCacheEntry *entry)
char sql [ 64 ] ;
char sql [ 64 ] ;
snprintf ( sql , sizeof ( sql ) , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
snprintf ( sql , sizeof ( sql ) , " SAVEPOINT s%d " , entry - > xact_depth + 1 ) ;
res = PQexec ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , sql ) ;
PQclear ( res ) ;
entry - > xact_depth + + ;
entry - > xact_depth + + ;
}
}
}
}
@ -509,10 +535,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
{
case XACT_EVENT_PRE_COMMIT :
case XACT_EVENT_PRE_COMMIT :
/* Commit all remote transactions during pre-commit */
/* Commit all remote transactions during pre-commit */
res = PQexec ( entry - > conn , " COMMIT TRANSACTION " ) ;
do_sql_command ( entry - > conn , " COMMIT TRANSACTION " ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , " COMMIT TRANSACTION " ) ;
PQclear ( res ) ;
/*
/*
* If there were any errors in subtransactions , and we made
* If there were any errors in subtransactions , and we made
@ -647,10 +670,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
{
{
/* Commit all remote subtransactions during pre-commit */
/* Commit all remote subtransactions during pre-commit */
snprintf ( sql , sizeof ( sql ) , " RELEASE SAVEPOINT s%d " , curlevel ) ;
snprintf ( sql , sizeof ( sql ) , " RELEASE SAVEPOINT s%d " , curlevel ) ;
res = PQexec ( entry - > conn , sql ) ;
do_sql_command ( entry - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , sql ) ;
PQclear ( res ) ;
}
}
else
else
{
{