@ -21,6 +21,7 @@
# include "commands/defrem.h"
# include "commands/explain.h"
# include "commands/vacuum.h"
# include "executor/execAsync.h"
# include "foreign/fdwapi.h"
# include "funcapi.h"
# include "miscadmin.h"
@ -37,6 +38,7 @@
# include "optimizer/tlist.h"
# include "parser/parsetree.h"
# include "postgres_fdw.h"
# include "storage/latch.h"
# include "utils/builtins.h"
# include "utils/float.h"
# include "utils/guc.h"
@ -143,6 +145,7 @@ typedef struct PgFdwScanState
/* for remote query execution */
PGconn * conn ; /* connection for the scan */
PgFdwConnState * conn_state ; /* extra per-connection state */
unsigned int cursor_number ; /* quasi-unique ID for my cursor */
bool cursor_exists ; /* have we created the cursor? */
int numParams ; /* number of parameters passed to query */
@ -159,6 +162,9 @@ typedef struct PgFdwScanState
int fetch_ct_2 ; /* Min(# of fetches done, 2) */
bool eof_reached ; /* true if last fetch reached EOF */
/* for asynchronous execution */
bool async_capable ; /* engage asynchronous-capable logic? */
/* working memory contexts */
MemoryContext batch_cxt ; /* context holding current batch of tuples */
MemoryContext temp_cxt ; /* context for per-tuple temporary data */
@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
/* for remote query execution */
PGconn * conn ; /* connection for the scan */
PgFdwConnState * conn_state ; /* extra per-connection state */
char * p_name ; /* name of prepared statement, if created */
/* extracted fdw_private data */
@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
/* for remote query execution */
PGconn * conn ; /* connection for the update */
PgFdwConnState * conn_state ; /* extra per-connection state */
int numParams ; /* number of parameters passed to query */
FmgrInfo * param_flinfo ; /* output conversion functions for them */
List * param_exprs ; /* executable expressions for param values */
@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo * input_rel ,
RelOptInfo * output_rel ,
void * extra ) ;
static bool postgresIsForeignPathAsyncCapable ( ForeignPath * path ) ;
static void postgresForeignAsyncRequest ( AsyncRequest * areq ) ;
static void postgresForeignAsyncConfigureWait ( AsyncRequest * areq ) ;
static void postgresForeignAsyncNotify ( AsyncRequest * areq ) ;
/*
* Helper functions
@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
void * arg ) ;
static void create_cursor ( ForeignScanState * node ) ;
static void fetch_more_data ( ForeignScanState * node ) ;
static void close_cursor ( PGconn * conn , unsigned int cursor_number ) ;
static void close_cursor ( PGconn * conn , unsigned int cursor_number ,
PgFdwConnState * conn_state ) ;
static PgFdwModifyState * create_foreign_modify ( EState * estate ,
RangeTblEntry * rte ,
ResultRelInfo * resultRelInfo ,
@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double * totaldeadrows ) ;
static void analyze_row_processor ( PGresult * res , int row ,
PgFdwAnalyzeState * astate ) ;
static void produce_tuple_asynchronously ( AsyncRequest * areq , bool fetch ) ;
static void fetch_more_data_begin ( AsyncRequest * areq ) ;
static HeapTuple make_tuple_from_result_row ( PGresult * res ,
int row ,
Relation rel ,
@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine - > GetForeignUpperPaths = postgresGetForeignUpperPaths ;
/* Support functions for asynchronous execution */
routine - > IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable ;
routine - > ForeignAsyncRequest = postgresForeignAsyncRequest ;
routine - > ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait ;
routine - > ForeignAsyncNotify = postgresForeignAsyncNotify ;
PG_RETURN_POINTER ( routine ) ;
}
@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
/*
* Extract user - settable option values . Note that per - table settings of
* use_remote_estimate and fetch_size override per - server settings of
* them , respectively .
* use_remote_estimate , fetch_size and async_capable override per - server
* settings of them , respectively .
*/
fpinfo - > use_remote_estimate = false ;
fpinfo - > fdw_startup_cost = DEFAULT_FDW_STARTUP_COST ;
fpinfo - > fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST ;
fpinfo - > shippable_extensions = NIL ;
fpinfo - > fetch_size = 100 ;
fpinfo - > async_capable = false ;
apply_server_options ( fpinfo ) ;
apply_table_options ( fpinfo ) ;
@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server . Connection manager will
* establish new connection if necessary .
*/
fsstate - > conn = GetConnection ( user , false ) ;
fsstate - > conn = GetConnection ( user , false , & fsstate - > conn_state ) ;
/* Assign a unique ID for my cursor */
fsstate - > cursor_number = GetCursorNumber ( fsstate - > conn ) ;
@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
& fsstate - > param_flinfo ,
& fsstate - > param_exprs ,
& fsstate - > param_values ) ;
/* Set the async-capable flag */
fsstate - > async_capable = node - > ss . ps . plan - > async_capable ;
}
/*
@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot * slot = node - > ss . ss_ScanTupleSlot ;
/*
* If this is the first call after Begin or ReScan , we need to create the
* cursor on the remote side .
* In sync mode , if this is the first call after Begin or ReScan , we need
* to create the cursor on the remote side . In async mode , we would have
* already created the cursor before we get here , even if this is the
* first call after Begin or ReScan .
*/
if ( ! fsstate - > cursor_exists )
create_cursor ( node ) ;
@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if ( fsstate - > next_tuple > = fsstate - > num_tuples )
{
/* In async mode, just clear tuple slot. */
if ( fsstate - > async_capable )
return ExecClearTuple ( slot ) ;
/* No point in another fetch if we already detected EOF, though. */
if ( ! fsstate - > eof_reached )
fetch_more_data ( node ) ;
@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don ' t use a PG_TRY block here , so be careful not to throw error
* without releasing the PGresult .
*/
res = pgfdw_exec_query ( fsstate - > conn , sql ) ;
res = pgfdw_exec_query ( fsstate - > conn , sql , fsstate - > conn_state ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , fsstate - > conn , true , sql ) ;
PQclear ( res ) ;
@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if ( fsstate - > cursor_exists )
close_cursor ( fsstate - > conn , fsstate - > cursor_number ) ;
close_cursor ( fsstate - > conn , fsstate - > cursor_number ,
fsstate - > conn_state ) ;
/* Release remote connection */
ReleaseConnection ( fsstate - > conn ) ;
@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server . Connection manager will
* establish new connection if necessary .
*/
dmstate - > conn = GetConnection ( user , false ) ;
dmstate - > conn = GetConnection ( user , false , & dmstate - > conn_state ) ;
/* Update the foreign-join-related fields. */
if ( fsplan - > scan . scanrelid = = 0 )
@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
false , & retrieved_attrs , NULL ) ;
/* Get the remote estimate */
conn = GetConnection ( fpinfo - > user , false ) ;
conn = GetConnection ( fpinfo - > user , false , NULL ) ;
get_remote_estimate ( sql . data , conn , & rows , & width ,
& startup_cost , & total_cost ) ;
ReleaseConnection ( conn ) ;
@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely .
*/
res = pgfdw_exec_query ( conn , sql ) ;
res = pgfdw_exec_query ( conn , sql , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , sql ) ;
@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
StringInfoData buf ;
PGresult * res ;
/* First, process a pending asynchronous request, if any. */
if ( fsstate - > conn_state - > pendingAreq )
process_pending_request ( fsstate - > conn_state - > pendingAreq ) ;
/*
* Construct array of query parameter values in text format . We do the
* conversions in the short - lived per - tuple context , so as not to cause a
@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
PG_TRY ( ) ;
{
PGconn * conn = fsstate - > conn ;
char sql [ 64 ] ;
int numrows ;
int i ;
snprintf ( sql , sizeof ( sql ) , " FETCH %d FROM c%u " ,
fsstate - > fetch_size , fsstate - > cursor_number ) ;
if ( fsstate - > async_capable )
{
Assert ( fsstate - > conn_state - > pendingAreq ) ;
res = pgfdw_exec_query ( conn , sql ) ;
/* On error, report the original query, not the FETCH. */
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , fsstate - > query ) ;
/*
* The query was already sent by an earlier call to
* fetch_more_data_begin . So now we just fetch the result .
*/
res = pgfdw_get_result ( conn , fsstate - > query ) ;
/* On error, report the original query, not the FETCH. */
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , fsstate - > query ) ;
/* Reset per-connection state */
fsstate - > conn_state - > pendingAreq = NULL ;
}
else
{
char sql [ 64 ] ;
/* This is a regular synchronous fetch. */
snprintf ( sql , sizeof ( sql ) , " FETCH %d FROM c%u " ,
fsstate - > fetch_size , fsstate - > cursor_number ) ;
res = pgfdw_exec_query ( conn , sql , fsstate - > conn_state ) ;
/* On error, report the original query, not the FETCH. */
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , fsstate - > query ) ;
}
/* Convert the data into HeapTuples */
numrows = PQntuples ( res ) ;
@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
* Utility routine to close a cursor .
*/
static void
close_cursor ( PGconn * conn , unsigned int cursor_number )
close_cursor ( PGconn * conn , unsigned int cursor_number ,
PgFdwConnState * conn_state )
{
char sql [ 64 ] ;
PGresult * res ;
@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don ' t use a PG_TRY block here , so be careful not to throw error
* without releasing the PGresult .
*/
res = pgfdw_exec_query ( conn , sql ) ;
res = pgfdw_exec_query ( conn , sql , conn_state ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , conn , true , sql ) ;
PQclear ( res ) ;
@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping ( userid , table - > serverid ) ;
/* Open connection; report that we'll create a prepared statement. */
fmstate - > conn = GetConnection ( user , true ) ;
fmstate - > conn = GetConnection ( user , true , & fmstate - > conn_state ) ;
fmstate - > p_name = NULL ; /* prepared statement not made yet */
/* Set up remote query information. */
@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
operation = = CMD_UPDATE | |
operation = = CMD_DELETE ) ;
/* First, process a pending asynchronous request, if any. */
if ( fmstate - > conn_state - > pendingAreq )
process_pending_request ( fmstate - > conn_state - > pendingAreq ) ;
/*
* If the existing query was deparsed and prepared for a different number
* of rows , rebuild it for the proper number .
@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
char * p_name ;
PGresult * res ;
/*
* The caller would already have processed a pending asynchronous request
* if any , so no need to do it here .
*/
/* Construct name we'll use for the prepared statement. */
snprintf ( prep_name , sizeof ( prep_name ) , " pgsql_fdw_prep_%u " ,
GetPrepStmtNumber ( fmstate - > conn ) ) ;
@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
* We don ' t use a PG_TRY block here , so be careful not to throw error
* without releasing the PGresult .
*/
res = pgfdw_exec_query ( fmstate - > conn , sql ) ;
res = pgfdw_exec_query ( fmstate - > conn , sql , fmstate - > conn_state ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , fmstate - > conn , true , sql ) ;
PQclear ( res ) ;
@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
int numParams = dmstate - > numParams ;
const char * * values = dmstate - > param_values ;
/* First, process a pending asynchronous request, if any. */
if ( dmstate - > conn_state - > pendingAreq )
process_pending_request ( dmstate - > conn_state - > pendingAreq ) ;
/*
* Construct array of query parameter values in text format .
*/
@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable ( RelationGetRelid ( relation ) ) ;
user = GetUserMapping ( relation - > rd_rel - > relowner , table - > serverid ) ;
conn = GetConnection ( user , false ) ;
conn = GetConnection ( user , false , NULL ) ;
/*
* Construct command to get page count for relation .
@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY ( ) ;
{
res = pgfdw_exec_query ( conn , sql . data ) ;
res = pgfdw_exec_query ( conn , sql . data , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , sql . data ) ;
@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable ( RelationGetRelid ( relation ) ) ;
server = GetForeignServer ( table - > serverid ) ;
user = GetUserMapping ( relation - > rd_rel - > relowner , table - > serverid ) ;
conn = GetConnection ( user , false ) ;
conn = GetConnection ( user , false , NULL ) ;
/*
* Construct cursor that retrieves whole rows from remote .
@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
int fetch_size ;
ListCell * lc ;
res = pgfdw_exec_query ( conn , sql . data ) ;
res = pgfdw_exec_query ( conn , sql . data , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , conn , false , sql . data ) ;
PQclear ( res ) ;
@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
*/
/* Fetch some rows */
res = pgfdw_exec_query ( conn , fetch_sql ) ;
res = pgfdw_exec_query ( conn , fetch_sql , NULL ) ;
/* On error, report the original query, not the FETCH. */
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , sql . data ) ;
@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
}
/* Close the cursor, just to be tidy. */
close_cursor ( conn , cursor_number ) ;
close_cursor ( conn , cursor_number , NULL ) ;
}
PG_CATCH ( ) ;
{
@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer ( serverOid ) ;
mapping = GetUserMapping ( GetUserId ( ) , server - > serverid ) ;
conn = GetConnection ( mapping , false ) ;
conn = GetConnection ( mapping , false , NULL ) ;
/* Don't attempt to import collation if remote server hasn't got it */
if ( PQserverVersion ( conn ) < 90100 )
@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString ( & buf , " SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = " ) ;
deparseStringLiteral ( & buf , stmt - > remote_schema ) ;
res = pgfdw_exec_query ( conn , buf . data ) ;
res = pgfdw_exec_query ( conn , buf . data , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , buf . data ) ;
@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString ( & buf , " ORDER BY c.relname, a.attnum " ) ;
/* Fetch the data */
res = pgfdw_exec_query ( conn , buf . data ) ;
res = pgfdw_exec_query ( conn , buf . data , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , conn , false , buf . data ) ;
@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
ExtractExtensionList ( defGetString ( def ) , false ) ;
else if ( strcmp ( def - > defname , " fetch_size " ) = = 0 )
fpinfo - > fetch_size = strtol ( defGetString ( def ) , NULL , 10 ) ;
else if ( strcmp ( def - > defname , " async_capable " ) = = 0 )
fpinfo - > async_capable = defGetBoolean ( def ) ;
}
}
@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
fpinfo - > use_remote_estimate = defGetBoolean ( def ) ;
else if ( strcmp ( def - > defname , " fetch_size " ) = = 0 )
fpinfo - > fetch_size = strtol ( defGetString ( def ) , NULL , 10 ) ;
else if ( strcmp ( def - > defname , " async_capable " ) = = 0 )
fpinfo - > async_capable = defGetBoolean ( def ) ;
}
}
@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo - > shippable_extensions = fpinfo_o - > shippable_extensions ;
fpinfo - > use_remote_estimate = fpinfo_o - > use_remote_estimate ;
fpinfo - > fetch_size = fpinfo_o - > fetch_size ;
fpinfo - > async_capable = fpinfo_o - > async_capable ;
/* Merge the table level options from either side of the join. */
if ( fpinfo_i )
@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
* relation sizes .
*/
fpinfo - > fetch_size = Max ( fpinfo_o - > fetch_size , fpinfo_i - > fetch_size ) ;
/*
* We ' ll prefer to consider this join async - capable if any table from
* either side of the join is considered async - capable .
*/
fpinfo - > async_capable = fpinfo_o - > async_capable | |
fpinfo_i - > async_capable ;
}
}
@ -6489,6 +6571,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
add_path ( final_rel , ( Path * ) final_path ) ;
}
/*
* postgresIsForeignPathAsyncCapable
* Check whether a given ForeignPath node is async - capable .
*/
static bool
postgresIsForeignPathAsyncCapable ( ForeignPath * path )
{
RelOptInfo * rel = ( ( Path * ) path ) - > parent ;
PgFdwRelationInfo * fpinfo = ( PgFdwRelationInfo * ) rel - > fdw_private ;
return fpinfo - > async_capable ;
}
/*
* postgresForeignAsyncRequest
* Asynchronously request next tuple from a foreign PostgreSQL table .
*/
static void
postgresForeignAsyncRequest ( AsyncRequest * areq )
{
produce_tuple_asynchronously ( areq , true ) ;
}
/*
* postgresForeignAsyncConfigureWait
* Configure a file descriptor event for which we wish to wait .
*/
static void
postgresForeignAsyncConfigureWait ( AsyncRequest * areq )
{
ForeignScanState * node = ( ForeignScanState * ) areq - > requestee ;
PgFdwScanState * fsstate = ( PgFdwScanState * ) node - > fdw_state ;
AsyncRequest * pendingAreq = fsstate - > conn_state - > pendingAreq ;
AppendState * requestor = ( AppendState * ) areq - > requestor ;
WaitEventSet * set = requestor - > as_eventset ;
/* This should not be called unless callback_pending */
Assert ( areq - > callback_pending ) ;
/* The core code would have registered postmaster death event */
Assert ( GetNumRegisteredWaitEvents ( set ) > = 1 ) ;
/* Begin an asynchronous data fetch if not already done */
if ( ! pendingAreq )
fetch_more_data_begin ( areq ) ;
else if ( pendingAreq - > requestor ! = areq - > requestor )
{
/*
* This is the case when the in - process request was made by another
* Append . Note that it might be useless to process the request ,
* because the query might not need tuples from that Append anymore .
* Skip the given request if there are any configured events other
* than the postmaster death event ; otherwise process the request ,
* then begin a fetch to configure the event below , because otherwise
* we might end up with no configured events other than the postmaster
* death event .
*/
if ( GetNumRegisteredWaitEvents ( set ) > 1 )
return ;
process_pending_request ( pendingAreq ) ;
fetch_more_data_begin ( areq ) ;
}
else if ( pendingAreq - > requestee ! = areq - > requestee )
{
/*
* This is the case when the in - process request was made by the same
* parent but for a different child . Since we configure only the
* event for the request made for that child , skip the given request .
*/
return ;
}
else
Assert ( pendingAreq = = areq ) ;
AddWaitEventToSet ( set , WL_SOCKET_READABLE , PQsocket ( fsstate - > conn ) ,
NULL , areq ) ;
}
/*
* postgresForeignAsyncNotify
* Fetch some more tuples from a file descriptor that becomes ready ,
* requesting next tuple .
*/
static void
postgresForeignAsyncNotify ( AsyncRequest * areq )
{
ForeignScanState * node = ( ForeignScanState * ) areq - > requestee ;
PgFdwScanState * fsstate = ( PgFdwScanState * ) node - > fdw_state ;
/* The request should be currently in-process */
Assert ( fsstate - > conn_state - > pendingAreq = = areq ) ;
/* The core code would have initialized the callback_pending flag */
Assert ( ! areq - > callback_pending ) ;
/* On error, report the original query, not the FETCH. */
if ( ! PQconsumeInput ( fsstate - > conn ) )
pgfdw_report_error ( ERROR , NULL , fsstate - > conn , false , fsstate - > query ) ;
fetch_more_data ( node ) ;
produce_tuple_asynchronously ( areq , true ) ;
}
/*
* Asynchronously produce next tuple from a foreign PostgreSQL table .
*/
static void
produce_tuple_asynchronously ( AsyncRequest * areq , bool fetch )
{
ForeignScanState * node = ( ForeignScanState * ) areq - > requestee ;
PgFdwScanState * fsstate = ( PgFdwScanState * ) node - > fdw_state ;
AsyncRequest * pendingAreq = fsstate - > conn_state - > pendingAreq ;
TupleTableSlot * result ;
/* This should not be called if the request is currently in-process */
Assert ( areq ! = pendingAreq ) ;
/* Fetch some more tuples, if we've run out */
if ( fsstate - > next_tuple > = fsstate - > num_tuples )
{
/* No point in another fetch if we already detected EOF, though */
if ( ! fsstate - > eof_reached )
{
/* Mark the request as pending for a callback */
ExecAsyncRequestPending ( areq ) ;
/* Begin another fetch if requested and if no pending request */
if ( fetch & & ! pendingAreq )
fetch_more_data_begin ( areq ) ;
}
else
{
/* There's nothing more to do; just return a NULL pointer */
result = NULL ;
/* Mark the request as complete */
ExecAsyncRequestDone ( areq , result ) ;
}
return ;
}
/* Get a tuple from the ForeignScan node */
result = ExecProcNode ( ( PlanState * ) node ) ;
if ( ! TupIsNull ( result ) )
{
/* Mark the request as complete */
ExecAsyncRequestDone ( areq , result ) ;
return ;
}
Assert ( fsstate - > next_tuple > = fsstate - > num_tuples ) ;
/* Fetch some more tuples, if we've not detected EOF yet */
if ( ! fsstate - > eof_reached )
{
/* Mark the request as pending for a callback */
ExecAsyncRequestPending ( areq ) ;
/* Begin another fetch if requested and if no pending request */
if ( fetch & & ! pendingAreq )
fetch_more_data_begin ( areq ) ;
}
else
{
/* There's nothing more to do; just return a NULL pointer */
result = NULL ;
/* Mark the request as complete */
ExecAsyncRequestDone ( areq , result ) ;
}
}
/*
* Begin an asynchronous data fetch .
*
* Note : fetch_more_data must be called to fetch the result .
*/
static void
fetch_more_data_begin ( AsyncRequest * areq )
{
ForeignScanState * node = ( ForeignScanState * ) areq - > requestee ;
PgFdwScanState * fsstate = ( PgFdwScanState * ) node - > fdw_state ;
char sql [ 64 ] ;
Assert ( ! fsstate - > conn_state - > pendingAreq ) ;
/* Create the cursor synchronously. */
if ( ! fsstate - > cursor_exists )
create_cursor ( node ) ;
/* We will send this query, but not wait for the response. */
snprintf ( sql , sizeof ( sql ) , " FETCH %d FROM c%u " ,
fsstate - > fetch_size , fsstate - > cursor_number ) ;
if ( PQsendQuery ( fsstate - > conn , sql ) < 0 )
pgfdw_report_error ( ERROR , NULL , fsstate - > conn , false , fsstate - > query ) ;
/* Remember that the request is in process */
fsstate - > conn_state - > pendingAreq = areq ;
}
/*
* Process a pending asynchronous request .
*/
void
process_pending_request ( AsyncRequest * areq )
{
ForeignScanState * node = ( ForeignScanState * ) areq - > requestee ;
PgFdwScanState * fsstate = ( PgFdwScanState * ) node - > fdw_state ;
EState * estate = node - > ss . ps . state ;
MemoryContext oldcontext ;
/* The request should be currently in-process */
Assert ( fsstate - > conn_state - > pendingAreq = = areq ) ;
oldcontext = MemoryContextSwitchTo ( estate - > es_query_cxt ) ;
/* The request would have been pending for a callback */
Assert ( areq - > callback_pending ) ;
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq - > callback_pending = false ;
fetch_more_data ( node ) ;
/* We need to send a new query afterwards; don't fetch */
produce_tuple_asynchronously ( areq , false ) ;
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
ExecAsyncResponse ( areq ) ;
MemoryContextSwitchTo ( oldcontext ) ;
}
/*
* Create a tuple from the specified row of the PGresult .
*