@ -87,8 +87,10 @@ enum FdwScanPrivateIndex
* 1 ) INSERT / UPDATE / DELETE statement text to be sent to the remote server
* 2 ) Integer list of target attribute numbers for INSERT / UPDATE
* ( NIL for a DELETE )
* 3 ) Boolean flag showing if the remote query has a RETURNING clause
* 4 ) Integer list of attribute numbers retrieved by RETURNING , if any
* 3 ) Length till the end of VALUES clause for INSERT
* ( - 1 for a DELETE / UPDATE )
* 4 ) Boolean flag showing if the remote query has a RETURNING clause
* 5 ) Integer list of attribute numbers retrieved by RETURNING , if any
*/
enum FdwModifyPrivateIndex
{
@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex
FdwModifyPrivateUpdateSql ,
/* Integer list of target attribute numbers for INSERT/UPDATE */
FdwModifyPrivateTargetAttnums ,
/* Length till the end of VALUES clause (as an integer Value node) */
FdwModifyPrivateLen ,
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning ,
/* Integer list of attribute numbers retrieved by RETURNING */
@ -176,7 +180,10 @@ typedef struct PgFdwModifyState
/* extracted fdw_private data */
char * query ; /* text of INSERT/UPDATE/DELETE command */
char * orig_query ; /* original text of INSERT command */
List * target_attrs ; /* list of target attribute numbers */
int values_end ; /* length up to the end of VALUES */
int batch_size ; /* value of FDW option "batch_size" */
bool has_returning ; /* is there a RETURNING clause? */
List * retrieved_attrs ; /* attr numbers retrieved by RETURNING */
@ -185,6 +192,9 @@ typedef struct PgFdwModifyState
int p_nums ; /* number of parameters to transmit */
FmgrInfo * p_flinfo ; /* output conversion functions for them */
/* batch operation stuff */
int num_slots ; /* number of slots to insert */
/* working memory context */
MemoryContext temp_cxt ; /* context for per-tuple temporary data */
@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
ResultRelInfo * resultRelInfo ,
TupleTableSlot * slot ,
TupleTableSlot * planSlot ) ;
static TupleTableSlot * * postgresExecForeignBatchInsert ( EState * estate ,
ResultRelInfo * resultRelInfo ,
TupleTableSlot * * slots ,
TupleTableSlot * * planSlots ,
int * numSlots ) ;
static int postgresGetForeignModifyBatchSize ( ResultRelInfo * resultRelInfo ) ;
static TupleTableSlot * postgresExecForeignUpdate ( EState * estate ,
ResultRelInfo * resultRelInfo ,
TupleTableSlot * slot ,
@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
Plan * subplan ,
char * query ,
List * target_attrs ,
int len ,
bool has_returning ,
List * retrieved_attrs ) ;
static TupleTableSlot * execute_foreign_modify ( EState * estate ,
static TupleTableSlot * * execute_foreign_modify ( EState * estate ,
ResultRelInfo * resultRelInfo ,
CmdType operation ,
TupleTableSlot * slot ,
TupleTableSlot * planSlot ) ;
TupleTableSlot * * slots ,
TupleTableSlot * * planSlots ,
int * numSlots ) ;
static void prepare_foreign_modify ( PgFdwModifyState * fmstate ) ;
static const char * * convert_prep_stmt_params ( PgFdwModifyState * fmstate ,
ItemPointer tupleid ,
TupleTableSlot * slot ) ;
TupleTableSlot * * slots ,
int numSlots ) ;
static void store_returning_result ( PgFdwModifyState * fmstate ,
TupleTableSlot * slot , PGresult * res ) ;
static void finish_foreign_modify ( PgFdwModifyState * fmstate ) ;
static void deallocate_query ( PgFdwModifyState * fmstate ) ;
static List * build_remote_returning ( Index rtindex , Relation rel ,
List * returningList ) ;
static void rebuild_fdw_scan_tlist ( ForeignScan * fscan , List * tlist ) ;
@ -505,6 +525,7 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo);
static void merge_fdw_options ( PgFdwRelationInfo * fpinfo ,
const PgFdwRelationInfo * fpinfo_o ,
const PgFdwRelationInfo * fpinfo_i ) ;
static int get_batch_size_option ( Relation rel ) ;
/*
@ -530,6 +551,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine - > PlanForeignModify = postgresPlanForeignModify ;
routine - > BeginForeignModify = postgresBeginForeignModify ;
routine - > ExecForeignInsert = postgresExecForeignInsert ;
routine - > ExecForeignBatchInsert = postgresExecForeignBatchInsert ;
routine - > GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize ;
routine - > ExecForeignUpdate = postgresExecForeignUpdate ;
routine - > ExecForeignDelete = postgresExecForeignDelete ;
routine - > EndForeignModify = postgresEndForeignModify ;
@ -1665,6 +1688,7 @@ postgresPlanForeignModify(PlannerInfo *root,
List * returningList = NIL ;
List * retrieved_attrs = NIL ;
bool doNothing = false ;
int values_end_len = - 1 ;
initStringInfo ( & sql ) ;
@ -1752,7 +1776,7 @@ postgresPlanForeignModify(PlannerInfo *root,
deparseInsertSql ( & sql , rte , resultRelation , rel ,
targetAttrs , doNothing ,
withCheckOptionList , returningList ,
& retrieved_attrs ) ;
& retrieved_attrs , & values_end_len ) ;
break ;
case CMD_UPDATE :
deparseUpdateSql ( & sql , rte , resultRelation , rel ,
@ -1776,8 +1800,9 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor .
* Items in the list must match enum FdwModifyPrivateIndex , above .
*/
return list_make4 ( makeString ( sql . data ) ,
return list_make5 ( makeString ( sql . data ) ,
targetAttrs ,
makeInteger ( values_end_len ) ,
makeInteger ( ( retrieved_attrs ! = NIL ) ) ,
retrieved_attrs ) ;
}
@ -1797,6 +1822,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
char * query ;
List * target_attrs ;
bool has_returning ;
int values_end_len ;
List * retrieved_attrs ;
RangeTblEntry * rte ;
@ -1812,6 +1838,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql ) ) ;
target_attrs = ( List * ) list_nth ( fdw_private ,
FdwModifyPrivateTargetAttnums ) ;
values_end_len = intVal ( list_nth ( fdw_private ,
FdwModifyPrivateLen ) ) ;
has_returning = intVal ( list_nth ( fdw_private ,
FdwModifyPrivateHasReturning ) ) ;
retrieved_attrs = ( List * ) list_nth ( fdw_private ,
@ -1829,6 +1857,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
mtstate - > mt_plans [ subplan_index ] - > plan ,
query ,
target_attrs ,
values_end_len ,
has_returning ,
retrieved_attrs ) ;
@ -1846,7 +1875,37 @@ postgresExecForeignInsert(EState *estate,
TupleTableSlot * planSlot )
{
PgFdwModifyState * fmstate = ( PgFdwModifyState * ) resultRelInfo - > ri_FdwState ;
TupleTableSlot * rslot ;
TupleTableSlot * * rslot ;
int numSlots = 1 ;
/*
* If the fmstate has aux_fmstate set , use the aux_fmstate ( see
* postgresBeginForeignInsert ( ) )
*/
if ( fmstate - > aux_fmstate )
resultRelInfo - > ri_FdwState = fmstate - > aux_fmstate ;
rslot = execute_foreign_modify ( estate , resultRelInfo , CMD_INSERT ,
& slot , & planSlot , & numSlots ) ;
/* Revert that change */
if ( fmstate - > aux_fmstate )
resultRelInfo - > ri_FdwState = fmstate ;
return rslot ? * rslot : NULL ;
}
/*
* postgresExecForeignBatchInsert
* Insert multiple rows into a foreign table
*/
static TupleTableSlot * *
postgresExecForeignBatchInsert ( EState * estate ,
ResultRelInfo * resultRelInfo ,
TupleTableSlot * * slots ,
TupleTableSlot * * planSlots ,
int * numSlots )
{
PgFdwModifyState * fmstate = ( PgFdwModifyState * ) resultRelInfo - > ri_FdwState ;
TupleTableSlot * * rslot ;
/*
* If the fmstate has aux_fmstate set , use the aux_fmstate ( see
@ -1855,7 +1914,7 @@ postgresExecForeignInsert(EState *estate,
if ( fmstate - > aux_fmstate )
resultRelInfo - > ri_FdwState = fmstate - > aux_fmstate ;
rslot = execute_foreign_modify ( estate , resultRelInfo , CMD_INSERT ,
slot , planSlot ) ;
slots , planSlots , numSlots ) ;
/* Revert that change */
if ( fmstate - > aux_fmstate )
resultRelInfo - > ri_FdwState = fmstate ;
@ -1863,6 +1922,42 @@ postgresExecForeignInsert(EState *estate,
return rslot ;
}
/*
* postgresGetForeignModifyBatchSize
* Determine the maximum number of tuples that can be inserted in bulk
*
* Returns the batch size specified for server or table . When batching is not
* allowed ( e . g . for tables with AFTER ROW triggers or with RETURNING clause ) ,
* returns 1.
*/
static int
postgresGetForeignModifyBatchSize ( ResultRelInfo * resultRelInfo )
{
int batch_size ;
/* should be called only once */
Assert ( resultRelInfo - > ri_BatchSize = = 0 ) ;
/*
* In EXPLAIN without ANALYZE , ri_fdwstate is NULL , so we have to lookup
* the option directly in server / table options . Otherwise just use the
* value we determined earlier .
*/
if ( resultRelInfo - > ri_FdwState )
batch_size = ( ( PgFdwModifyState * ) resultRelInfo - > ri_FdwState ) - > batch_size ;
else
batch_size = get_batch_size_option ( resultRelInfo - > ri_RelationDesc ) ;
/* Disable batching when we have to use RETURNING. */
if ( resultRelInfo - > ri_projectReturning ! = NULL | |
( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_after_row ) )
return 1 ;
/* Otherwise use the batch size specified for server/table. */
return batch_size ;
}
/*
* postgresExecForeignUpdate
* Update one row in a foreign table
@ -1873,8 +1968,13 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot * slot ,
TupleTableSlot * planSlot )
{
return execute_foreign_modify ( estate , resultRelInfo , CMD_UPDATE ,
slot , planSlot ) ;
TupleTableSlot * * rslot ;
int numSlots = 1 ;
rslot = execute_foreign_modify ( estate , resultRelInfo , CMD_UPDATE ,
& slot , & planSlot , & numSlots ) ;
return rslot ? rslot [ 0 ] : NULL ;
}
/*
@ -1887,8 +1987,13 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot * slot ,
TupleTableSlot * planSlot )
{
return execute_foreign_modify ( estate , resultRelInfo , CMD_DELETE ,
slot , planSlot ) ;
TupleTableSlot * * rslot ;
int numSlots = 1 ;
rslot = execute_foreign_modify ( estate , resultRelInfo , CMD_DELETE ,
& slot , & planSlot , & numSlots ) ;
return rslot ? rslot [ 0 ] : NULL ;
}
/*
@ -1925,6 +2030,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
RangeTblEntry * rte ;
TupleDesc tupdesc = RelationGetDescr ( rel ) ;
int attnum ;
int values_end_len ;
StringInfoData sql ;
List * targetAttrs = NIL ;
List * retrieved_attrs = NIL ;
@ -2001,7 +2107,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
deparseInsertSql ( & sql , rte , resultRelation , rel , targetAttrs , doNothing ,
resultRelInfo - > ri_WithCheckOptions ,
resultRelInfo - > ri_returningList ,
& retrieved_attrs ) ;
& retrieved_attrs , & values_end_len ) ;
/* Construct an execution state. */
fmstate = create_foreign_modify ( mtstate - > ps . state ,
@ -2011,6 +2117,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
NULL ,
sql . data ,
targetAttrs ,
values_end_len ,
retrieved_attrs ! = NIL ,
retrieved_attrs ) ;
@ -2636,6 +2743,13 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql ) ) ;
ExplainPropertyText ( " Remote SQL " , sql , es ) ;
/*
* For INSERT we should always have batch size > = 1 , but UPDATE
* and DELETE don ' t support batching so don ' t show the property .
*/
if ( rinfo - > ri_BatchSize > 0 )
ExplainPropertyInteger ( " Batch Size " , NULL , rinfo - > ri_BatchSize , es ) ;
}
}
@ -3530,6 +3644,7 @@ create_foreign_modify(EState *estate,
Plan * subplan ,
char * query ,
List * target_attrs ,
int values_end ,
bool has_returning ,
List * retrieved_attrs )
{
@ -3564,7 +3679,10 @@ create_foreign_modify(EState *estate,
/* Set up remote query information. */
fmstate - > query = query ;
if ( operation = = CMD_INSERT )
fmstate - > orig_query = pstrdup ( fmstate - > query ) ;
fmstate - > target_attrs = target_attrs ;
fmstate - > values_end = values_end ;
fmstate - > has_returning = has_returning ;
fmstate - > retrieved_attrs = retrieved_attrs ;
@ -3616,6 +3734,12 @@ create_foreign_modify(EState *estate,
Assert ( fmstate - > p_nums < = n_params ) ;
/* Set batch_size from foreign server/table options. */
if ( operation = = CMD_INSERT )
fmstate - > batch_size = get_batch_size_option ( rel ) ;
fmstate - > num_slots = 1 ;
/* Initialize auxiliary state */
fmstate - > aux_fmstate = NULL ;
@ -3626,26 +3750,48 @@ create_foreign_modify(EState *estate,
* execute_foreign_modify
* Perform foreign - table modification as required , and fetch RETURNING
* result if any . ( This is the shared guts of postgresExecForeignInsert ,
* postgresExecForeignUpdate , and postgresExecForeignDelete . )
* postgresExecForeignBatchInsert , postgresExecForeignUpdate , and
* postgresExecForeignDelete . )
*/
static TupleTableSlot *
static TupleTableSlot * *
execute_foreign_modify ( EState * estate ,
ResultRelInfo * resultRelInfo ,
CmdType operation ,
TupleTableSlot * slot ,
TupleTableSlot * planSlot )
TupleTableSlot * * slots ,
TupleTableSlot * * planSlots ,
int * numSlots )
{
PgFdwModifyState * fmstate = ( PgFdwModifyState * ) resultRelInfo - > ri_FdwState ;
ItemPointer ctid = NULL ;
const char * * p_values ;
PGresult * res ;
int n_rows ;
StringInfoData sql ;
/* The operation should be INSERT, UPDATE, or DELETE */
Assert ( operation = = CMD_INSERT | |
operation = = CMD_UPDATE | |
operation = = CMD_DELETE ) ;
/*
* If the existing query was deparsed and prepared for a different number
* of rows , rebuild it for the proper number .
*/
if ( operation = = CMD_INSERT & & fmstate - > num_slots ! = * numSlots )
{
/* Destroy the prepared statement created previously */
if ( fmstate - > p_name )
deallocate_query ( fmstate ) ;
/* Build INSERT string with numSlots records in its VALUES clause. */
initStringInfo ( & sql ) ;
rebuildInsertSql ( & sql , fmstate - > orig_query , fmstate - > values_end ,
fmstate - > p_nums , * numSlots - 1 ) ;
pfree ( fmstate - > query ) ;
fmstate - > query = sql . data ;
fmstate - > num_slots = * numSlots ;
}
/* Set up the prepared statement on the remote server, if we didn't yet */
if ( ! fmstate - > p_name )
prepare_foreign_modify ( fmstate ) ;
@ -3658,7 +3804,7 @@ execute_foreign_modify(EState *estate,
Datum datum ;
bool isNull ;
datum = ExecGetJunkAttribute ( planSlot ,
datum = ExecGetJunkAttribute ( planSlots [ 0 ] ,
fmstate - > ctidAttno ,
& isNull ) ;
/* shouldn't ever get a null result... */
@ -3668,14 +3814,14 @@ execute_foreign_modify(EState *estate,
}
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params ( fmstate , ctid , slot ) ;
p_values = convert_prep_stmt_params ( fmstate , ctid , slots , * numSlots ) ;
/*
* Execute the prepared statement .
*/
if ( ! PQsendQueryPrepared ( fmstate - > conn ,
fmstate - > p_name ,
fmstate - > p_nums ,
fmstate - > p_nums * ( * numSlots ) ,
p_values ,
NULL ,
NULL ,
@ -3696,9 +3842,10 @@ execute_foreign_modify(EState *estate,
/* Check number of rows affected, and fetch RETURNING tuple if any */
if ( fmstate - > has_returning )
{
Assert ( * numSlots = = 1 ) ;
n_rows = PQntuples ( res ) ;
if ( n_rows > 0 )
store_returning_result ( fmstate , slot , res ) ;
store_returning_result ( fmstate , slots [ 0 ] , res ) ;
}
else
n_rows = atoi ( PQcmdTuples ( res ) ) ;
@ -3708,10 +3855,12 @@ execute_foreign_modify(EState *estate,
MemoryContextReset ( fmstate - > temp_cxt ) ;
* numSlots = n_rows ;
/*
* Return NULL if nothing was inserted / updated / deleted on the remote end
*/
return ( n_rows > 0 ) ? slot : NULL ;
return ( n_rows > 0 ) ? slots : NULL ;
}
/*
@ -3771,52 +3920,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
static const char * *
convert_prep_stmt_params ( PgFdwModifyState * fmstate ,
ItemPointer tupleid ,
TupleTableSlot * slot )
TupleTableSlot * * slots ,
int numSlots )
{
const char * * p_values ;
int i ;
int j ;
int pindex = 0 ;
MemoryContext oldcontext ;
oldcontext = MemoryContextSwitchTo ( fmstate - > temp_cxt ) ;
p_values = ( const char * * ) palloc ( sizeof ( char * ) * fmstate - > p_nums ) ;
p_values = ( const char * * ) palloc ( sizeof ( char * ) * fmstate - > p_nums * numSlots ) ;
/* ctid is provided only for UPDATE/DELETE, which don't allow batching */
Assert ( ! ( tupleid ! = NULL & & numSlots > 1 ) ) ;
/* 1st parameter should be ctid, if it's in use */
if ( tupleid ! = NULL )
{
Assert ( numSlots = = 1 ) ;
/* don't need set_transmission_modes for TID output */
p_values [ pindex ] = OutputFunctionCall ( & fmstate - > p_flinfo [ pindex ] ,
PointerGetDatum ( tupleid ) ) ;
pindex + + ;
}
/* get following parameters from slot */
if ( slot ! = NULL & & fmstate - > target_attrs ! = NIL )
/* get following parameters from slots */
if ( slots ! = NULL & & fmstate - > target_attrs ! = NIL )
{
int nestlevel ;
ListCell * lc ;
nestlevel = set_transmission_modes ( ) ;
foreach ( lc , fmstate - > target_attrs )
for ( i = 0 ; i < numSlots ; i + + )
{
int attnum = lfirst_int ( lc ) ;
Datum value ;
bool isnull ;
j = ( tupleid ! = NULL ) ? 1 : 0 ;
foreach ( lc , fmstate - > target_attrs )
{
int attnum = lfirst_int ( lc ) ;
Datum value ;
bool isnull ;
value = slot_getattr ( slot , attnum , & isnull ) ;
if ( isnull )
p_values [ pindex ] = NULL ;
else
p_values [ pindex ] = OutputFunctionCall ( & fmstate - > p_flinfo [ pindex ] ,
value ) ;
pindex + + ;
value = slot_getattr ( slots [ i ] , attnum , & isnull ) ;
if ( isnull )
p_values [ pindex ] = NULL ;
else
p_values [ pindex ] = OutputFunctionCall ( & fmstate - > p_flinfo [ j ] ,
value ) ;
pindex + + ;
j + + ;
}
}
reset_transmission_modes ( nestlevel ) ;
}
Assert ( pindex = = fmstate - > p_nums ) ;
Assert ( pindex = = fmstate - > p_nums * numSlots ) ;
MemoryContextSwitchTo ( oldcontext ) ;
@ -3870,29 +4031,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
Assert ( fmstate ! = NULL ) ;
/* If we created a prepared statement, destroy it */
if ( fmstate - > p_name )
{
char sql [ 64 ] ;
PGresult * res ;
snprintf ( sql , sizeof ( sql ) , " DEALLOCATE %s " , fmstate - > p_name ) ;
/*
* We don ' t use a PG_TRY block here , so be careful not to throw error
* without releasing the PGresult .
*/
res = pgfdw_exec_query ( fmstate - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , fmstate - > conn , true , sql ) ;
PQclear ( res ) ;
fmstate - > p_name = NULL ;
}
deallocate_query ( fmstate ) ;
/* Release remote connection */
ReleaseConnection ( fmstate - > conn ) ;
fmstate - > conn = NULL ;
}
/*
* deallocate_query
* Deallocate a prepared statement for a foreign insert / update / delete
* operation
*/
static void
deallocate_query ( PgFdwModifyState * fmstate )
{
char sql [ 64 ] ;
PGresult * res ;
/* do nothing if the query is not allocated */
if ( ! fmstate - > p_name )
return ;
snprintf ( sql , sizeof ( sql ) , " DEALLOCATE %s " , fmstate - > p_name ) ;
/*
* We don ' t use a PG_TRY block here , so be careful not to throw error
* without releasing the PGresult .
*/
res = pgfdw_exec_query ( fmstate - > conn , sql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pgfdw_report_error ( ERROR , res , fmstate - > conn , true , sql ) ;
PQclear ( res ) ;
fmstate - > p_name = NULL ;
}
/*
* build_remote_returning
* Build a RETURNING targetlist of a remote query for performing an
@ -6577,3 +6750,45 @@ find_em_expr_for_input_target(PlannerInfo *root,
elog ( ERROR , " could not find pathkey item to sort " ) ;
return NULL ; /* keep compiler quiet */
}
/*
* Determine batch size for a given foreign table . The option specified for
* a table has precedence .
*/
static int
get_batch_size_option ( Relation rel )
{
Oid foreigntableid = RelationGetRelid ( rel ) ;
ForeignTable * table ;
ForeignServer * server ;
List * options ;
ListCell * lc ;
/* we use 1 by default, which means "no batching" */
int batch_size = 1 ;
/*
* Load options for table and server . We append server options after
* table options , because table options take precedence .
*/
table = GetForeignTable ( foreigntableid ) ;
server = GetForeignServer ( table - > serverid ) ;
options = NIL ;
options = list_concat ( options , table - > options ) ;
options = list_concat ( options , server - > options ) ;
/* See if either table or server specifies batch_size. */
foreach ( lc , options )
{
DefElem * def = ( DefElem * ) lfirst ( lc ) ;
if ( strcmp ( def - > defname , " batch_size " ) = = 0 )
{
batch_size = strtol ( defGetString ( def ) , NULL , 10 ) ;
break ;
}
}
return batch_size ;
}