@ -22,6 +22,7 @@
# include "pgstat.h"
# include "storage/latch.h"
# include "utils/hsearch.h"
# include "utils/inval.h"
# include "utils/memutils.h"
# include "utils/syscache.h"
@ -48,11 +49,15 @@ typedef struct ConnCacheEntry
{
ConnCacheKey key ; /* hash key (must be first) */
PGconn * conn ; /* connection to foreign server, or NULL */
/* Remaining fields are invalid when conn is NULL: */
int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open , etc */
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
bool have_error ; /* have any subxacts aborted in this xact? */
bool changing_xact_state ; /* xact state change in process */
bool invalidated ; /* true if reconnect is pending */
uint32 server_hashvalue ; /* hash value of foreign server OID */
uint32 mapping_hashvalue ; /* hash value of user mapping OID */
} ConnCacheEntry ;
/*
@ -69,6 +74,7 @@ static bool xact_got_connection = false;
/* prototypes of private functions */
static PGconn * connect_pg_server ( ForeignServer * server , UserMapping * user ) ;
static void disconnect_pg_server ( ConnCacheEntry * entry ) ;
static void check_conn_params ( const char * * keywords , const char * * values ) ;
static void configure_remote_session ( PGconn * conn ) ;
static void do_sql_command ( PGconn * conn , const char * sql ) ;
@ -78,6 +84,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid ,
SubTransactionId parentSubid ,
void * arg ) ;
static void pgfdw_inval_callback ( Datum arg , int cacheid , uint32 hashvalue ) ;
static void pgfdw_reject_incomplete_xact_state_change ( ConnCacheEntry * entry ) ;
static bool pgfdw_cancel_query ( PGconn * conn ) ;
static bool pgfdw_exec_cleanup_query ( PGconn * conn , const char * query ,
@ -95,13 +102,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
* will_prep_stmt must be true if caller intends to create any prepared
* statements . Since those don ' t go away automatically at transaction end
* ( not even on error ) , we need this flag to cue manual cleanup .
*
* XXX Note that caching connections theoretically requires a mechanism to
* detect change of FDW objects to invalidate already established connections .
* We could manage that by watching for invalidation events on the relevant
* syscaches . For the moment , though , it ' s not clear that this would really
* be useful and not mere pedantry . We could not flush any active connections
* mid - transaction anyway .
*/
PGconn *
GetConnection ( UserMapping * user , bool will_prep_stmt )
@ -130,6 +130,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
*/
RegisterXactCallback ( pgfdw_xact_callback , NULL ) ;
RegisterSubXactCallback ( pgfdw_subxact_callback , NULL ) ;
CacheRegisterSyscacheCallback ( FOREIGNSERVEROID ,
pgfdw_inval_callback , ( Datum ) 0 ) ;
CacheRegisterSyscacheCallback ( USERMAPPINGOID ,
pgfdw_inval_callback , ( Datum ) 0 ) ;
}
/* Set flag that we did GetConnection during the current transaction */
@ -144,17 +148,27 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry = hash_search ( ConnectionHash , & key , HASH_ENTER , & found ) ;
if ( ! found )
{
/* initialize new hashtable entry (key is already filled in) */
/*
* We need only clear " conn " here ; remaining fields will be filled
* later when " conn " is set .
*/
entry - > conn = NULL ;
entry - > xact_depth = 0 ;
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
entry - > changing_xact_state = false ;
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change ( entry ) ;
/*
* If the connection needs to be remade due to invalidation , disconnect as
* soon as we ' re out of all transactions .
*/
if ( entry - > conn ! = NULL & & entry - > invalidated & & entry - > xact_depth = = 0 )
{
elog ( DEBUG3 , " closing connection %p for option changes to take effect " ,
entry - > conn ) ;
disconnect_pg_server ( entry ) ;
}
/*
* We don ' t check the health of cached connection here , because it would
* require some overhead . Broken connection will be detected when the
@ -164,15 +178,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/*
* If cache entry doesn ' t have a connection , we have to establish a new
* connection . ( If connect_pg_server throws an error , the cache entry
* will be left in a valid empty state . )
* will remain in a valid empty state , ie conn = = NULL . )
*/
if ( entry - > conn = = NULL )
{
ForeignServer * server = GetForeignServer ( user - > serverid ) ;
entry - > xact_depth = 0 ; /* just to be sure */
/* Reset all transient state fields, to be sure all are clean */
entry - > xact_depth = 0 ;
entry - > have_prep_stmt = false ;
entry - > have_error = false ;
entry - > changing_xact_state = false ;
entry - > invalidated = false ;
entry - > server_hashvalue =
GetSysCacheHashValue1 ( FOREIGNSERVEROID ,
ObjectIdGetDatum ( server - > serverid ) ) ;
entry - > mapping_hashvalue =
GetSysCacheHashValue1 ( USERMAPPINGOID ,
ObjectIdGetDatum ( user - > umid ) ) ;
/* Now try to make the connection */
entry - > conn = connect_pg_server ( server , user ) ;
elog ( DEBUG3 , " new postgres_fdw connection %p for server \" %s \" (user mapping oid %u, userid %u) " ,
@ -276,6 +301,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
return conn ;
}
/*
* Disconnect any open connection for a connection cache entry .
*/
static void
disconnect_pg_server ( ConnCacheEntry * entry )
{
if ( entry - > conn ! = NULL )
{
PQfinish ( entry - > conn ) ;
entry - > conn = NULL ;
}
}
/*
* For non - superusers , insist that the connstr specify a password . This
* prevents a password from being picked up from . pgpass , a service file ,
@ -777,9 +815,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
entry - > changing_xact_state )
{
elog ( DEBUG3 , " discarding connection %p " , entry - > conn ) ;
PQfinish ( entry - > conn ) ;
entry - > conn = NULL ;
entry - > changing_xact_state = false ;
disconnect_pg_server ( entry ) ;
}
}
@ -896,6 +932,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
}
/*
* Connection invalidation callback function
*
* After a change to a pg_foreign_server or pg_user_mapping catalog entry ,
* mark connections depending on that entry as needing to be remade .
* We can ' t immediately destroy them , since they might be in the midst of
* a transaction , but we ' ll remake them at the next opportunity .
*
* Although most cache invalidation callbacks blow away all the related stuff
* regardless of the given hashvalue , connections are expensive enough that
* it ' s worth trying to avoid that .
*
* NB : We could avoid unnecessary disconnection more strictly by examining
* individual option values , but it seems too much effort for the gain .
*/
static void
pgfdw_inval_callback ( Datum arg , int cacheid , uint32 hashvalue )
{
HASH_SEQ_STATUS scan ;
ConnCacheEntry * entry ;
Assert ( cacheid = = FOREIGNSERVEROID | | cacheid = = USERMAPPINGOID ) ;
/* ConnectionHash must exist already, if we're registered */
hash_seq_init ( & scan , ConnectionHash ) ;
while ( ( entry = ( ConnCacheEntry * ) hash_seq_search ( & scan ) ) )
{
/* Ignore invalid entries */
if ( entry - > conn = = NULL )
continue ;
/* hashvalue == 0 means a cache reset, must clear all state */
if ( hashvalue = = 0 | |
( cacheid = = FOREIGNSERVEROID & &
entry - > server_hashvalue = = hashvalue ) | |
( cacheid = = USERMAPPINGOID & &
entry - > mapping_hashvalue = = hashvalue ) )
entry - > invalidated = true ;
}
}
/*
* Raise an error if the given connection cache entry is marked as being
* in the middle of an xact state change . This should be called at which no
@ -913,9 +990,14 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Form_pg_user_mapping umform ;
ForeignServer * server ;
if ( ! entry - > changing_xact_state )
/* nothing to do for inactive entries and entries of sane state */
if ( entry - > conn = = NULL | | ! entry - > changing_xact_state )
return ;
/* make sure this entry is inactive */
disconnect_pg_server ( entry ) ;
/* find server name to be shown in the message below */
tup = SearchSysCache1 ( USERMAPPINGOID ,
ObjectIdGetDatum ( entry - > key ) ) ;
if ( ! HeapTupleIsValid ( tup ) )