@ -53,6 +53,7 @@
# include "utils/acl.h"
# include "utils/acl.h"
# include "utils/builtins.h"
# include "utils/builtins.h"
# include "utils/fmgroids.h"
# include "utils/fmgroids.h"
# include "utils/guc.h"
# include "utils/lsyscache.h"
# include "utils/lsyscache.h"
# include "utils/memutils.h"
# include "utils/memutils.h"
# include "utils/rel.h"
# include "utils/rel.h"
@ -86,7 +87,8 @@ typedef struct storeInfo
*/
*/
static Datum dblink_record_internal ( FunctionCallInfo fcinfo , bool is_async ) ;
static Datum dblink_record_internal ( FunctionCallInfo fcinfo , bool is_async ) ;
static void prepTuplestoreResult ( FunctionCallInfo fcinfo ) ;
static void prepTuplestoreResult ( FunctionCallInfo fcinfo ) ;
static void materializeResult ( FunctionCallInfo fcinfo , PGresult * res ) ;
static void materializeResult ( FunctionCallInfo fcinfo , PGconn * conn ,
PGresult * res ) ;
static void materializeQueryResult ( FunctionCallInfo fcinfo ,
static void materializeQueryResult ( FunctionCallInfo fcinfo ,
PGconn * conn ,
PGconn * conn ,
const char * conname ,
const char * conname ,
@ -118,6 +120,8 @@ static void validate_pkattnums(Relation rel,
int * * pkattnums , int * pknumatts ) ;
int * * pkattnums , int * pknumatts ) ;
static bool is_valid_dblink_option ( const PQconninfoOption * options ,
static bool is_valid_dblink_option ( const PQconninfoOption * options ,
const char * option , Oid context ) ;
const char * option , Oid context ) ;
static int applyRemoteGucs ( PGconn * conn ) ;
static void restoreLocalGucs ( int nestlevel ) ;
/* Global */
/* Global */
static remoteConn * pconn = NULL ;
static remoteConn * pconn = NULL ;
@ -605,7 +609,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
errmsg ( " cursor \" %s \" does not exist " , curname ) ) ) ;
errmsg ( " cursor \" %s \" does not exist " , curname ) ) ) ;
}
}
materializeResult ( fcinfo , res ) ;
materializeResult ( fcinfo , conn , res ) ;
return ( Datum ) 0 ;
return ( Datum ) 0 ;
}
}
@ -750,7 +754,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
}
}
else
else
{
{
materializeResult ( fcinfo , res ) ;
materializeResult ( fcinfo , conn , res ) ;
}
}
}
}
}
}
@ -806,7 +810,7 @@ prepTuplestoreResult(FunctionCallInfo fcinfo)
* The PGresult will be released in this function .
* The PGresult will be released in this function .
*/
*/
static void
static void
materializeResult ( FunctionCallInfo fcinfo , PGresult * res )
materializeResult ( FunctionCallInfo fcinfo , PGconn * conn , PG result * res )
{
{
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
@ -816,7 +820,7 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
PG_TRY ( ) ;
PG_TRY ( ) ;
{
{
TupleDesc tupdesc ;
TupleDesc tupdesc ;
bool is_sql_cmd = false ;
bool is_sql_cmd ;
int ntuples ;
int ntuples ;
int nfields ;
int nfields ;
@ -877,6 +881,7 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
if ( ntuples > 0 )
if ( ntuples > 0 )
{
{
AttInMetadata * attinmeta ;
AttInMetadata * attinmeta ;
int nestlevel = - 1 ;
Tuplestorestate * tupstore ;
Tuplestorestate * tupstore ;
MemoryContext oldcontext ;
MemoryContext oldcontext ;
int row ;
int row ;
@ -884,6 +889,10 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
attinmeta = TupleDescGetAttInMetadata ( tupdesc ) ;
attinmeta = TupleDescGetAttInMetadata ( tupdesc ) ;
/* Set GUCs to ensure we read GUC-sensitive data types correctly */
if ( ! is_sql_cmd )
nestlevel = applyRemoteGucs ( conn ) ;
oldcontext = MemoryContextSwitchTo (
oldcontext = MemoryContextSwitchTo (
rsinfo - > econtext - > ecxt_per_query_memory ) ;
rsinfo - > econtext - > ecxt_per_query_memory ) ;
tupstore = tuplestore_begin_heap ( true , false , work_mem ) ;
tupstore = tuplestore_begin_heap ( true , false , work_mem ) ;
@ -920,6 +929,9 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
tuplestore_puttuple ( tupstore , tuple ) ;
tuplestore_puttuple ( tupstore , tuple ) ;
}
}
/* clean up GUC settings, if we changed any */
restoreLocalGucs ( nestlevel ) ;
/* clean up and return the tuplestore */
/* clean up and return the tuplestore */
tuplestore_donestoring ( tupstore ) ;
tuplestore_donestoring ( tupstore ) ;
}
}
@ -1053,6 +1065,7 @@ static PGresult *
storeQueryResult ( storeInfo * sinfo , PGconn * conn , const char * sql )
storeQueryResult ( storeInfo * sinfo , PGconn * conn , const char * sql )
{
{
bool first = true ;
bool first = true ;
int nestlevel = - 1 ;
PGresult * res ;
PGresult * res ;
if ( ! PQsendQuery ( conn , sql ) )
if ( ! PQsendQuery ( conn , sql ) )
@ -1072,6 +1085,15 @@ storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
if ( PQresultStatus ( sinfo - > cur_res ) = = PGRES_SINGLE_TUPLE )
if ( PQresultStatus ( sinfo - > cur_res ) = = PGRES_SINGLE_TUPLE )
{
{
/* got one row from possibly-bigger resultset */
/* got one row from possibly-bigger resultset */
/*
* Set GUCs to ensure we read GUC - sensitive data types correctly .
* We shouldn ' t do this until we have a row in hand , to ensure
* libpq has seen any earlier ParameterStatus protocol messages .
*/
if ( first & & nestlevel < 0 )
nestlevel = applyRemoteGucs ( conn ) ;
storeRow ( sinfo , sinfo - > cur_res , first ) ;
storeRow ( sinfo , sinfo - > cur_res , first ) ;
PQclear ( sinfo - > cur_res ) ;
PQclear ( sinfo - > cur_res ) ;
@ -1092,6 +1114,9 @@ storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
}
}
}
}
/* clean up GUC settings, if we changed any */
restoreLocalGucs ( nestlevel ) ;
/* return last_res */
/* return last_res */
res = sinfo - > last_res ;
res = sinfo - > last_res ;
sinfo - > last_res = NULL ;
sinfo - > last_res = NULL ;
@ -2898,3 +2923,73 @@ is_valid_dblink_option(const PQconninfoOption *options, const char *option,
return true ;
return true ;
}
}
/*
* Copy the remote session ' s values of GUCs that affect datatype I / O
* and apply them locally in a new GUC nesting level . Returns the new
* nestlevel ( which is needed by restoreLocalGucs to undo the settings ) ,
* or - 1 if no new nestlevel was needed .
*
* We use the equivalent of a function SET option to allow the settings to
* persist only until the caller calls restoreLocalGucs . If an error is
* thrown in between , guc . c will take care of undoing the settings .
*/
static int
applyRemoteGucs ( PGconn * conn )
{
static const char * const GUCsAffectingIO [ ] = {
" DateStyle " ,
" IntervalStyle "
} ;
int nestlevel = - 1 ;
int i ;
for ( i = 0 ; i < lengthof ( GUCsAffectingIO ) ; i + + )
{
const char * gucName = GUCsAffectingIO [ i ] ;
const char * remoteVal = PQparameterStatus ( conn , gucName ) ;
const char * localVal ;
/*
* If the remote server is pre - 8.4 , it won ' t have IntervalStyle , but
* that ' s okay because its output format won ' t be ambiguous . So just
* skip the GUC if we don ' t get a value for it . ( We might eventually
* need more complicated logic with remote - version checks here . )
*/
if ( remoteVal = = NULL )
continue ;
/*
* Avoid GUC - setting overhead if the remote and local GUCs already
* have the same value .
*/
localVal = GetConfigOption ( gucName , false , false ) ;
Assert ( localVal ! = NULL ) ;
if ( strcmp ( remoteVal , localVal ) = = 0 )
continue ;
/* Create new GUC nest level if we didn't already */
if ( nestlevel < 0 )
nestlevel = NewGUCNestLevel ( ) ;
/* Apply the option (this will throw error on failure) */
( void ) set_config_option ( gucName , remoteVal ,
PGC_USERSET , PGC_S_SESSION ,
GUC_ACTION_SAVE , true , 0 ) ;
}
return nestlevel ;
}
/*
* Restore local GUCs after they have been overlaid with remote settings .
*/
static void
restoreLocalGucs ( int nestlevel )
{
/* Do nothing if no new nestlevel was created */
if ( nestlevel > 0 )
AtEOXact_GUC ( true , nestlevel ) ;
}