@ -84,9 +84,10 @@ typedef struct PgFdwRelationInfo
* Indexes of FDW - private information stored in fdw_private lists .
* Indexes of FDW - private information stored in fdw_private lists .
*
*
* We store various information in ForeignScan . fdw_private to pass it from
* We store various information in ForeignScan . fdw_private to pass it from
* planner to executor . Currently there is just :
* planner to executor . Currently we store :
*
*
* 1 ) SELECT statement text to be sent to the remote server
* 1 ) SELECT statement text to be sent to the remote server
* 2 ) Integer list of attribute numbers retrieved by the SELECT
*
*
* These items are indexed with the enum FdwScanPrivateIndex , so an item
* These items are indexed with the enum FdwScanPrivateIndex , so an item
* can be fetched with list_nth ( ) . For example , to get the SELECT statement :
* can be fetched with list_nth ( ) . For example , to get the SELECT statement :
@ -95,7 +96,9 @@ typedef struct PgFdwRelationInfo
enum FdwScanPrivateIndex
enum FdwScanPrivateIndex
{
{
/* SQL statement to execute remotely (as a String node) */
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql
FdwScanPrivateSelectSql ,
/* Integer list of attribute numbers retrieved by the SELECT */
FdwScanPrivateRetrievedAttrs
} ;
} ;
/*
/*
@ -106,6 +109,7 @@ enum FdwScanPrivateIndex
* 2 ) Integer list of target attribute numbers for INSERT / UPDATE
* 2 ) Integer list of target attribute numbers for INSERT / UPDATE
* ( NIL for a DELETE )
* ( NIL for a DELETE )
* 3 ) Boolean flag showing if there ' s a RETURNING clause
* 3 ) Boolean flag showing if there ' s a RETURNING clause
* 4 ) Integer list of attribute numbers retrieved by RETURNING , if any
*/
*/
enum FdwModifyPrivateIndex
enum FdwModifyPrivateIndex
{
{
@ -114,7 +118,9 @@ enum FdwModifyPrivateIndex
/* Integer list of target attribute numbers for INSERT/UPDATE */
/* Integer list of target attribute numbers for INSERT/UPDATE */
FdwModifyPrivateTargetAttnums ,
FdwModifyPrivateTargetAttnums ,
/* has-returning flag (as an integer Value node) */
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning
FdwModifyPrivateHasReturning ,
/* Integer list of attribute numbers retrieved by RETURNING */
FdwModifyPrivateRetrievedAttrs
} ;
} ;
/*
/*
@ -125,7 +131,9 @@ typedef struct PgFdwScanState
Relation rel ; /* relcache entry for the foreign table */
Relation rel ; /* relcache entry for the foreign table */
AttInMetadata * attinmeta ; /* attribute datatype conversion metadata */
AttInMetadata * attinmeta ; /* attribute datatype conversion metadata */
List * fdw_private ; /* FDW-private information from planner */
/* extracted fdw_private data */
char * query ; /* text of SELECT command */
List * retrieved_attrs ; /* list of retrieved attribute numbers */
/* for remote query execution */
/* for remote query execution */
PGconn * conn ; /* connection for the scan */
PGconn * conn ; /* connection for the scan */
@ -166,6 +174,7 @@ typedef struct PgFdwModifyState
char * query ; /* text of INSERT/UPDATE/DELETE command */
char * query ; /* text of INSERT/UPDATE/DELETE command */
List * target_attrs ; /* list of target attribute numbers */
List * target_attrs ; /* list of target attribute numbers */
bool has_returning ; /* is there a RETURNING clause? */
bool has_returning ; /* is there a RETURNING clause? */
List * retrieved_attrs ; /* attr numbers retrieved by RETURNING */
/* info about parameters for prepared statement */
/* info about parameters for prepared statement */
AttrNumber ctidAttno ; /* attnum of input resjunk ctid column */
AttrNumber ctidAttno ; /* attnum of input resjunk ctid column */
@ -183,6 +192,7 @@ typedef struct PgFdwAnalyzeState
{
{
Relation rel ; /* relcache entry for the foreign table */
Relation rel ; /* relcache entry for the foreign table */
AttInMetadata * attinmeta ; /* attribute datatype conversion metadata */
AttInMetadata * attinmeta ; /* attribute datatype conversion metadata */
List * retrieved_attrs ; /* attr numbers retrieved by query */
/* collected sample rows */
/* collected sample rows */
HeapTuple * rows ; /* array of size targrows */
HeapTuple * rows ; /* array of size targrows */
@ -314,6 +324,7 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
int row ,
int row ,
Relation rel ,
Relation rel ,
AttInMetadata * attinmeta ,
AttInMetadata * attinmeta ,
List * retrieved_attrs ,
MemoryContext temp_context ) ;
MemoryContext temp_context ) ;
static void conversion_error_callback ( void * arg ) ;
static void conversion_error_callback ( void * arg ) ;
@ -728,6 +739,7 @@ postgresGetForeignPlan(PlannerInfo *root,
List * remote_conds = NIL ;
List * remote_conds = NIL ;
List * local_exprs = NIL ;
List * local_exprs = NIL ;
List * params_list = NIL ;
List * params_list = NIL ;
List * retrieved_attrs ;
StringInfoData sql ;
StringInfoData sql ;
ListCell * lc ;
ListCell * lc ;
@ -777,7 +789,8 @@ postgresGetForeignPlan(PlannerInfo *root,
* expressions to be sent as parameters .
* expressions to be sent as parameters .
*/
*/
initStringInfo ( & sql ) ;
initStringInfo ( & sql ) ;
deparseSelectSql ( & sql , root , baserel , fpinfo - > attrs_used ) ;
deparseSelectSql ( & sql , root , baserel , fpinfo - > attrs_used ,
& retrieved_attrs ) ;
if ( remote_conds )
if ( remote_conds )
appendWhereClause ( & sql , root , baserel , remote_conds ,
appendWhereClause ( & sql , root , baserel , remote_conds ,
true , & params_list ) ;
true , & params_list ) ;
@ -829,7 +842,8 @@ postgresGetForeignPlan(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor .
* Build the fdw_private list that will be available to the executor .
* Items in the list must match enum FdwScanPrivateIndex , above .
* Items in the list must match enum FdwScanPrivateIndex , above .
*/
*/
fdw_private = list_make1 ( makeString ( sql . data ) ) ;
fdw_private = list_make2 ( makeString ( sql . data ) ,
retrieved_attrs ) ;
/*
/*
* Create the ForeignScan node from target list , local filtering
* Create the ForeignScan node from target list , local filtering
@ -901,7 +915,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate - > cursor_exists = false ;
fsstate - > cursor_exists = false ;
/* Get private info created by planner functions. */
/* Get private info created by planner functions. */
fsstate - > fdw_private = fsplan - > fdw_private ;
fsstate - > query = strVal ( list_nth ( fsplan - > fdw_private ,
FdwScanPrivateSelectSql ) ) ;
fsstate - > retrieved_attrs = ( List * ) list_nth ( fsplan - > fdw_private ,
FdwScanPrivateRetrievedAttrs ) ;
/* Create contexts for batches of tuples and per-tuple temp workspace. */
/* Create contexts for batches of tuples and per-tuple temp workspace. */
fsstate - > batch_cxt = AllocSetContextCreate ( estate - > es_query_cxt ,
fsstate - > batch_cxt = AllocSetContextCreate ( estate - > es_query_cxt ,
@ -915,7 +932,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
ALLOCSET_SMALL_INITSIZE ,
ALLOCSET_SMALL_INITSIZE ,
ALLOCSET_SMALL_MAXSIZE ) ;
ALLOCSET_SMALL_MAXSIZE ) ;
/* Get info we'll need for data conversion. */
/* Get info we'll need for input data conversion. */
fsstate - > attinmeta = TupleDescGetAttInMetadata ( RelationGetDescr ( fsstate - > rel ) ) ;
fsstate - > attinmeta = TupleDescGetAttInMetadata ( RelationGetDescr ( fsstate - > rel ) ) ;
/* Prepare for output conversion of parameters used in remote query. */
/* Prepare for output conversion of parameters used in remote query. */
@ -1138,6 +1155,7 @@ postgresPlanForeignModify(PlannerInfo *root,
StringInfoData sql ;
StringInfoData sql ;
List * targetAttrs = NIL ;
List * targetAttrs = NIL ;
List * returningList = NIL ;
List * returningList = NIL ;
List * retrieved_attrs = NIL ;
initStringInfo ( & sql ) ;
initStringInfo ( & sql ) ;
@ -1194,15 +1212,18 @@ postgresPlanForeignModify(PlannerInfo *root,
{
{
case CMD_INSERT :
case CMD_INSERT :
deparseInsertSql ( & sql , root , resultRelation , rel ,
deparseInsertSql ( & sql , root , resultRelation , rel ,
targetAttrs , returningList ) ;
targetAttrs , returningList ,
& retrieved_attrs ) ;
break ;
break ;
case CMD_UPDATE :
case CMD_UPDATE :
deparseUpdateSql ( & sql , root , resultRelation , rel ,
deparseUpdateSql ( & sql , root , resultRelation , rel ,
targetAttrs , returningList ) ;
targetAttrs , returningList ,
& retrieved_attrs ) ;
break ;
break ;
case CMD_DELETE :
case CMD_DELETE :
deparseDeleteSql ( & sql , root , resultRelation , rel ,
deparseDeleteSql ( & sql , root , resultRelation , rel ,
returningList ) ;
returningList ,
& retrieved_attrs ) ;
break ;
break ;
default :
default :
elog ( ERROR , " unexpected operation: %d " , ( int ) operation ) ;
elog ( ERROR , " unexpected operation: %d " , ( int ) operation ) ;
@ -1215,9 +1236,10 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor .
* Build the fdw_private list that will be available to the executor .
* Items in the list must match enum FdwModifyPrivateIndex , above .
* Items in the list must match enum FdwModifyPrivateIndex , above .
*/
*/
return list_make3 ( makeString ( sql . data ) ,
return list_make4 ( makeString ( sql . data ) ,
targetAttrs ,
targetAttrs ,
makeInteger ( ( returningList ! = NIL ) ) ) ;
makeInteger ( ( returningList ! = NIL ) ) ,
retrieved_attrs ) ;
}
}
/*
/*
@ -1279,6 +1301,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateTargetAttnums ) ;
FdwModifyPrivateTargetAttnums ) ;
fmstate - > has_returning = intVal ( list_nth ( fdw_private ,
fmstate - > has_returning = intVal ( list_nth ( fdw_private ,
FdwModifyPrivateHasReturning ) ) ;
FdwModifyPrivateHasReturning ) ) ;
fmstate - > retrieved_attrs = ( List * ) list_nth ( fdw_private ,
FdwModifyPrivateRetrievedAttrs ) ;
/* Create context for per-tuple temp workspace. */
/* Create context for per-tuple temp workspace. */
fmstate - > temp_cxt = AllocSetContextCreate ( estate - > es_query_cxt ,
fmstate - > temp_cxt = AllocSetContextCreate ( estate - > es_query_cxt ,
@ -1641,6 +1665,7 @@ estimate_path_cost_size(PlannerInfo *root,
if ( fpinfo - > use_remote_estimate )
if ( fpinfo - > use_remote_estimate )
{
{
StringInfoData sql ;
StringInfoData sql ;
List * retrieved_attrs ;
PGconn * conn ;
PGconn * conn ;
/*
/*
@ -1650,7 +1675,8 @@ estimate_path_cost_size(PlannerInfo *root,
*/
*/
initStringInfo ( & sql ) ;
initStringInfo ( & sql ) ;
appendStringInfoString ( & sql , " EXPLAIN " ) ;
appendStringInfoString ( & sql , " EXPLAIN " ) ;
deparseSelectSql ( & sql , root , baserel , fpinfo - > attrs_used ) ;
deparseSelectSql ( & sql , root , baserel , fpinfo - > attrs_used ,
& retrieved_attrs ) ;
if ( fpinfo - > remote_conds )
if ( fpinfo - > remote_conds )
appendWhereClause ( & sql , root , baserel , fpinfo - > remote_conds ,
appendWhereClause ( & sql , root , baserel , fpinfo - > remote_conds ,
true , NULL ) ;
true , NULL ) ;
@ -1819,7 +1845,6 @@ create_cursor(ForeignScanState *node)
int numParams = fsstate - > numParams ;
int numParams = fsstate - > numParams ;
const char * * values = fsstate - > param_values ;
const char * * values = fsstate - > param_values ;
PGconn * conn = fsstate - > conn ;
PGconn * conn = fsstate - > conn ;
char * sql ;
StringInfoData buf ;
StringInfoData buf ;
PGresult * res ;
PGresult * res ;
@ -1867,10 +1892,9 @@ create_cursor(ForeignScanState *node)
}
}
/* Construct the DECLARE CURSOR command */
/* Construct the DECLARE CURSOR command */
sql = strVal ( list_nth ( fsstate - > fdw_private , FdwScanPrivateSelectSql ) ) ;
initStringInfo ( & buf ) ;
initStringInfo ( & buf ) ;
appendStringInfo ( & buf , " DECLARE c%u CURSOR FOR \n %s " ,
appendStringInfo ( & buf , " DECLARE c%u CURSOR FOR \n %s " ,
fsstate - > cursor_number , sql ) ;
fsstate - > cursor_number , fsstate - > query ) ;
/*
/*
* Notice that we pass NULL for paramTypes , thus forcing the remote server
* Notice that we pass NULL for paramTypes , thus forcing the remote server
@ -1885,7 +1909,7 @@ create_cursor(ForeignScanState *node)
res = PQexecParams ( conn , buf . data , numParams , NULL , values ,
res = PQexecParams ( conn , buf . data , numParams , NULL , values ,
NULL , NULL , 0 ) ;
NULL , NULL , 0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , true , sql ) ;
pgfdw_report_error ( ERROR , res , true , fsstate - > query ) ;
PQclear ( res ) ;
PQclear ( res ) ;
/* Mark the cursor as created, and show no tuples have been retrieved */
/* Mark the cursor as created, and show no tuples have been retrieved */
@ -1936,9 +1960,7 @@ fetch_more_data(ForeignScanState *node)
res = PQexec ( conn , sql ) ;
res = PQexec ( conn , sql ) ;
/* On error, report the original query, not the FETCH. */
/* On error, report the original query, not the FETCH. */
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
pgfdw_report_error ( ERROR , res , false ,
pgfdw_report_error ( ERROR , res , false , fsstate - > query ) ;
strVal ( list_nth ( fsstate - > fdw_private ,
FdwScanPrivateSelectSql ) ) ) ;
/* Convert the data into HeapTuples */
/* Convert the data into HeapTuples */
numrows = PQntuples ( res ) ;
numrows = PQntuples ( res ) ;
@ -1952,6 +1974,7 @@ fetch_more_data(ForeignScanState *node)
make_tuple_from_result_row ( res , i ,
make_tuple_from_result_row ( res , i ,
fsstate - > rel ,
fsstate - > rel ,
fsstate - > attinmeta ,
fsstate - > attinmeta ,
fsstate - > retrieved_attrs ,
fsstate - > temp_cxt ) ;
fsstate - > temp_cxt ) ;
}
}
@ -2170,6 +2193,7 @@ store_returning_result(PgFdwModifyState *fmstate,
newtup = make_tuple_from_result_row ( res , 0 ,
newtup = make_tuple_from_result_row ( res , 0 ,
fmstate - > rel ,
fmstate - > rel ,
fmstate - > attinmeta ,
fmstate - > attinmeta ,
fmstate - > retrieved_attrs ,
fmstate - > temp_cxt ) ;
fmstate - > temp_cxt ) ;
/* tuple will be deleted when it is cleared from the slot */
/* tuple will be deleted when it is cleared from the slot */
ExecStoreTuple ( newtup , slot , InvalidBuffer , true ) ;
ExecStoreTuple ( newtup , slot , InvalidBuffer , true ) ;
@ -2316,7 +2340,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
cursor_number = GetCursorNumber ( conn ) ;
cursor_number = GetCursorNumber ( conn ) ;
initStringInfo ( & sql ) ;
initStringInfo ( & sql ) ;
appendStringInfo ( & sql , " DECLARE c%u CURSOR FOR " , cursor_number ) ;
appendStringInfo ( & sql , " DECLARE c%u CURSOR FOR " , cursor_number ) ;
deparseAnalyzeSql ( & sql , relation ) ;
deparseAnalyzeSql ( & sql , relation , & astate . retrieved_attrs ) ;
/* In what follows, do not risk leaking any PGresults. */
/* In what follows, do not risk leaking any PGresults. */
PG_TRY ( ) ;
PG_TRY ( ) ;
@ -2461,6 +2485,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate - > rows [ pos ] = make_tuple_from_result_row ( res , row ,
astate - > rows [ pos ] = make_tuple_from_result_row ( res , row ,
astate - > rel ,
astate - > rel ,
astate - > attinmeta ,
astate - > attinmeta ,
astate - > retrieved_attrs ,
astate - > temp_cxt ) ;
astate - > temp_cxt ) ;
MemoryContextSwitchTo ( oldcontext ) ;
MemoryContextSwitchTo ( oldcontext ) ;
@ -2471,26 +2496,27 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
* Create a tuple from the specified row of the PGresult .
* Create a tuple from the specified row of the PGresult .
*
*
* rel is the local representation of the foreign table , attinmeta is
* rel is the local representation of the foreign table , attinmeta is
* conversion data for the rel ' s tupdesc , and temp_context is a working
* conversion data for the rel ' s tupdesc , and retrieved_attrs is an
* context that can be reset after each tuple .
* integer list of the table column numbers present in the PGresult .
* temp_context is a working context that can be reset after each tuple .
*/
*/
static HeapTuple
static HeapTuple
make_tuple_from_result_row ( PGresult * res ,
make_tuple_from_result_row ( PGresult * res ,
int row ,
int row ,
Relation rel ,
Relation rel ,
AttInMetadata * attinmeta ,
AttInMetadata * attinmeta ,
List * retrieved_attrs ,
MemoryContext temp_context )
MemoryContext temp_context )
{
{
HeapTuple tuple ;
HeapTuple tuple ;
TupleDesc tupdesc = RelationGetDescr ( rel ) ;
TupleDesc tupdesc = RelationGetDescr ( rel ) ;
Form_pg_attribute * attrs = tupdesc - > attrs ;
Datum * values ;
Datum * values ;
bool * nulls ;
bool * nulls ;
ItemPointer ctid = NULL ;
ItemPointer ctid = NULL ;
ConversionLocation errpos ;
ConversionLocation errpos ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
MemoryContext oldcontext ;
MemoryContext oldcontext ;
int i ;
ListCell * lc ;
int j ;
int j ;
Assert ( row < PQntuples ( res ) ) ;
Assert ( row < PQntuples ( res ) ) ;
@ -2502,8 +2528,10 @@ make_tuple_from_result_row(PGresult *res,
*/
*/
oldcontext = MemoryContextSwitchTo ( temp_context ) ;
oldcontext = MemoryContextSwitchTo ( temp_context ) ;
values = ( Datum * ) palloc ( tupdesc - > natts * sizeof ( Datum ) ) ;
values = ( Datum * ) palloc0 ( tupdesc - > natts * sizeof ( Datum ) ) ;
nulls = ( bool * ) palloc ( tupdesc - > natts * sizeof ( bool ) ) ;
nulls = ( bool * ) palloc ( tupdesc - > natts * sizeof ( bool ) ) ;
/* Initialize to nulls for any columns not present in result */
memset ( nulls , true , tupdesc - > natts * sizeof ( bool ) ) ;
/*
/*
* Set up and install callback to report where conversion error occurs .
* Set up and install callback to report where conversion error occurs .
@ -2517,63 +2545,56 @@ make_tuple_from_result_row(PGresult *res,
/*
/*
* i indexes columns in the relation , j indexes columns in the PGresult .
* i indexes columns in the relation , j indexes columns in the PGresult .
* We assume dropped columns are not represented in the PGresult .
*/
*/
for ( i = 0 , j = 0 ; i < tupdesc - > natts ; i + + )
j = 0 ;
foreach ( lc , retrieved_attrs )
{
{
int i = lfirst_int ( lc ) ;
char * valstr ;
char * valstr ;
/* skip dropped columns. */
/* fetch next column's textual value */
if ( attrs [ i ] - > attisdropped )
{
values [ i ] = ( Datum ) 0 ;
nulls [ i ] = true ;
continue ;
}
/* convert value to internal representation */
if ( PQgetisnull ( res , row , j ) )
if ( PQgetisnull ( res , row , j ) )
{
valstr = NULL ;
valstr = NULL ;
nulls [ i ] = true ;
}
else
else
{
valstr = PQgetvalue ( res , row , j ) ;
valstr = PQgetvalue ( res , row , j ) ;
nulls [ i ] = false ;
}
/* Note: apply the input function even to nulls, to support domains */
errpos . cur_attno = i + 1 ;
values [ i ] = InputFunctionCall ( & attinmeta - > attinfuncs [ i ] ,
valstr ,
attinmeta - > attioparams [ i ] ,
attinmeta - > atttypmods [ i ] ) ;
errpos . cur_attno = 0 ;
j + + ;
/* convert value to internal representation */
}
if ( i > 0 )
{
/* ordinary column */
Assert ( i < = tupdesc - > natts ) ;
nulls [ i - 1 ] = ( valstr = = NULL ) ;
/* Apply the input function even to nulls, to support domains */
errpos . cur_attno = i ;
values [ i - 1 ] = InputFunctionCall ( & attinmeta - > attinfuncs [ i - 1 ] ,
valstr ,
attinmeta - > attioparams [ i - 1 ] ,
attinmeta - > atttypmods [ i - 1 ] ) ;
errpos . cur_attno = 0 ;
}
else if ( i = = SelfItemPointerAttributeNumber )
{
/* ctid --- note we ignore any other system column in result */
if ( valstr ! = NULL )
{
Datum datum ;
/*
datum = DirectFunctionCall1 ( tidin , CStringGetDatum ( valstr ) ) ;
* Convert ctid if present . XXX we could stand to have a cleaner way of
ctid = ( ItemPointer ) DatumGetPointer ( datum ) ;
* detecting whether ctid is included in the result .
}
*/
}
if ( j < PQnfields ( res ) )
{
char * valstr ;
Datum datum ;
valstr = PQgetvalue ( res , row , j ) ;
datum = DirectFunctionCall1 ( tidin , CStringGetDatum ( valstr ) ) ;
ctid = ( ItemPointer ) DatumGetPointer ( datum ) ;
j + + ;
j + + ;
}
}
/* Uninstall error context callback. */
/* Uninstall error context callback. */
error_context_stack = errcallback . previous ;
error_context_stack = errcallback . previous ;
/* check result and tuple descriptor have the same number of columns */
/*
if ( j ! = PQnfields ( res ) )
* Check we got the expected number of columns . Note : j = = 0 and
* PQnfields = = 1 is expected , since deparse emits a NULL if no columns .
*/
if ( j > 0 & & j ! = PQnfields ( res ) )
elog ( ERROR , " remote query result does not match the foreign table " ) ;
elog ( ERROR , " remote query result does not match the foreign table " ) ;
/*
/*