@ -67,6 +67,7 @@ typedef struct remoteConn
* Internal declarations
*/
static Datum dblink_record_internal ( FunctionCallInfo fcinfo , bool is_async ) ;
static void prepTuplestoreResult ( FunctionCallInfo fcinfo ) ;
static void materializeResult ( FunctionCallInfo fcinfo , PGresult * res ) ;
static remoteConn * getConnectionByName ( const char * name ) ;
static HTAB * createConnHash ( void ) ;
@ -495,7 +496,6 @@ PG_FUNCTION_INFO_V1(dblink_fetch);
Datum
dblink_fetch ( PG_FUNCTION_ARGS )
{
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
PGresult * res = NULL ;
char * conname = NULL ;
remoteConn * rconn = NULL ;
@ -505,6 +505,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
int howmany = 0 ;
bool fail = true ; /* default to backward compatible */
prepTuplestoreResult ( fcinfo ) ;
DBLINK_INIT ;
if ( PG_NARGS ( ) = = 4 )
@ -551,11 +553,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
if ( ! conn )
DBLINK_CONN_NOT_AVAIL ;
/* let the caller know we're sending back a tuplestore */
rsinfo - > returnMode = SFRM_Materialize ;
rsinfo - > setResult = NULL ;
rsinfo - > setDesc = NULL ;
initStringInfo ( & buf ) ;
appendStringInfo ( & buf , " FETCH %d FROM %s " , howmany , curname ) ;
@ -632,7 +629,6 @@ dblink_get_result(PG_FUNCTION_ARGS)
static Datum
dblink_record_internal ( FunctionCallInfo fcinfo , bool is_async )
{
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
char * msg ;
PGresult * res = NULL ;
PGconn * conn = NULL ;
@ -643,16 +639,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
bool fail = true ; /* default to backward compatible */
bool freeconn = false ;
/* check to see if caller supports us returning a tuplestore */
if ( rsinfo = = NULL | | ! IsA ( rsinfo , ReturnSetInfo ) )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " set-valued function called in context that cannot accept a set " ) ) ) ;
if ( ! ( rsinfo - > allowedModes & SFRM_Materialize ) )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " materialize mode required, but it is not " \
" allowed in this context " ) ) ) ;
prepTuplestoreResult ( fcinfo ) ;
DBLINK_INIT ;
@ -712,11 +699,6 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
if ( ! conn )
DBLINK_CONN_NOT_AVAIL ;
/* let the caller know we're sending back a tuplestore */
rsinfo - > returnMode = SFRM_Materialize ;
rsinfo - > setResult = NULL ;
rsinfo - > setDesc = NULL ;
/* synchronous query, or async result retrieval */
if ( ! is_async )
res = PQexec ( conn , sql ) ;
@ -745,14 +727,45 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
}
/*
* Materialize the PGresult to return them as the function result .
* The res will be released in this function .
* Verify function caller can handle a tuplestore result , and set up for that .
*
* Note : if the caller returns without actually creating a tuplestore , the
* executor will treat the function result as an empty set .
*/
static void
prepTuplestoreResult ( FunctionCallInfo fcinfo )
{
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
/* check to see if query supports us returning a tuplestore */
if ( rsinfo = = NULL | | ! IsA ( rsinfo , ReturnSetInfo ) )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " set-valued function called in context that cannot accept a set " ) ) ) ;
if ( ! ( rsinfo - > allowedModes & SFRM_Materialize ) )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " materialize mode required, but it is not allowed in this context " ) ) ) ;
/* let the executor know we're sending back a tuplestore */
rsinfo - > returnMode = SFRM_Materialize ;
/* caller must fill these to return a non-empty result */
rsinfo - > setResult = NULL ;
rsinfo - > setDesc = NULL ;
}
/*
* Copy the contents of the PGresult into a tuplestore to be returned
* as the result of the current function .
* The PGresult will be released in this function .
*/
static void
materializeResult ( FunctionCallInfo fcinfo , PGresult * res )
{
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
/* prepTuplestoreResult must have been called previously */
Assert ( rsinfo - > returnMode = = SFRM_Materialize ) ;
PG_TRY ( ) ;
@ -1004,85 +1017,97 @@ PG_FUNCTION_INFO_V1(dblink_exec);
Datum
dblink_exec ( PG_FUNCTION_ARGS )
{
char * msg ;
PGresult * res = NULL ;
text * sql_cmd_status = NULL ;
PGconn * conn = NULL ;
char * connstr = NULL ;
char * sql = NULL ;
char * conname = NULL ;
remoteConn * rconn = NULL ;
bool freeconn = false ;
bool fail = true ; /* default to backward compatible behavior */
text * volatile sql_cmd_status = NULL ;
PGconn * volatile conn = NULL ;
volatile bool freeconn = false ;
DBLINK_INIT ;
if ( PG_NARGS ( ) = = 3 )
{
/* must be text,text,bool */
DBLINK_GET_CONN ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 1 ) ) ;
fail = PG_GETARG_BOOL ( 2 ) ;
}
else if ( PG_NARGS ( ) = = 2 )
PG_TRY ( ) ;
{
/* might be text,text or text,bool */
if ( get_fn_expr_argtype ( fcinfo - > flinfo , 1 ) = = BOOLOID )
char * msg ;
PGresult * res = NULL ;
char * connstr = NULL ;
char * sql = NULL ;
char * conname = NULL ;
remoteConn * rconn = NULL ;
bool fail = true ; /* default to backward compatible behavior */
if ( PG_NARGS ( ) = = 3 )
{
/* must be text,text,bool */
DBLINK_GET_CONN ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 1 ) ) ;
fail = PG_GETARG_BOOL ( 2 ) ;
}
else if ( PG_NARGS ( ) = = 2 )
{
/* might be text,text or text,bool */
if ( get_fn_expr_argtype ( fcinfo - > flinfo , 1 ) = = BOOLOID )
{
conn = pconn - > conn ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) ) ;
fail = PG_GETARG_BOOL ( 1 ) ;
}
else
{
DBLINK_GET_CONN ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 1 ) ) ;
}
}
else if ( PG_NARGS ( ) = = 1 )
{
/* must be single text argument */
conn = pconn - > conn ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) ) ;
fail = PG_GETARG_BOOL ( 1 ) ;
}
else
{
DBLINK_GET_CONN ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 1 ) ) ;
}
}
else if ( PG_NARGS ( ) = = 1 )
{
/* must be single text argument */
conn = pconn - > conn ;
sql = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) ) ;
}
else
/* shouldn't happen */
elog ( ERROR , " wrong number of arguments " ) ;
/* shouldn't happen */
elog ( ERROR , " wrong number of arguments " ) ;
if ( ! conn )
DBLINK_CONN_NOT_AVAIL ;
if ( ! conn )
DBLINK_CONN_NOT_AVAIL ;
res = PQexec ( conn , sql ) ;
if ( ! res | |
( PQresultStatus ( res ) ! = PGRES_COMMAND_OK & &
PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) )
{
dblink_res_error ( conname , res , " could not execute command " , fail ) ;
res = PQexec ( conn , sql ) ;
if ( ! res | |
( PQresultStatus ( res ) ! = PGRES_COMMAND_OK & &
PQresultStatus ( res ) ! = PGRES_TUPLES_OK ) )
{
dblink_res_error ( conname , res , " could not execute command " , fail ) ;
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = cstring_to_text ( " ERROR " ) ;
}
else if ( PQresultStatus ( res ) = = PGRES_COMMAND_OK )
{
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = cstring_to_text ( PQcmdStatus ( res ) ) ;
PQclear ( res ) ;
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = cstring_to_text ( " ERROR " ) ;
}
else if ( PQresultStatus ( res ) = = PGRES_COMMAND_OK )
{
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = cstring_to_text ( PQcmdStatus ( res ) ) ;
PQclear ( res ) ;
}
else
{
PQclear ( res ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED ) ,
errmsg ( " statement returning results not allowed " ) ) ) ;
}
}
else
PG_CATCH ( ) ;
{
PQclear ( res ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED ) ,
errmsg ( " statement returning results not allowed " ) ) ) ;
/* if needed, close the connection to the database */
if ( freeconn )
PQfinish ( conn ) ;
PG_RE_THROW ( ) ;
}
PG_END_TRY ( ) ;
/* if needed, close the connection to the database and cleanup */
/* if needed, close the connection to the database */
if ( freeconn )
PQfinish ( conn ) ;
@ -1503,13 +1528,15 @@ dblink_get_notify(PG_FUNCTION_ARGS)
MemoryContext per_query_ctx ;
MemoryContext oldcontext ;
prepTuplestoreResult ( fcinfo ) ;
DBLINK_INIT ;
if ( PG_NARGS ( ) = = 1 )
DBLINK_GET_NAMED_CONN ;
else
conn = pconn - > conn ;
/* create the tuplestore */
/* create the tuplestore in per-query memory */
per_query_ctx = rsinfo - > econtext - > ecxt_per_query_memory ;
oldcontext = MemoryContextSwitchTo ( per_query_ctx ) ;
@ -1522,7 +1549,6 @@ dblink_get_notify(PG_FUNCTION_ARGS)
TEXTOID , - 1 , 0 ) ;
tupstore = tuplestore_begin_heap ( true , false , work_mem ) ;
rsinfo - > returnMode = SFRM_Materialize ;
rsinfo - > setResult = tupstore ;
rsinfo - > setDesc = tupdesc ;