@ -78,7 +78,8 @@ typedef struct CopyMultiInsertBuffer
{
{
TupleTableSlot * slots [ MAX_BUFFERED_TUPLES ] ; /* Array to store tuples */
TupleTableSlot * slots [ MAX_BUFFERED_TUPLES ] ; /* Array to store tuples */
ResultRelInfo * resultRelInfo ; /* ResultRelInfo for 'relid' */
ResultRelInfo * resultRelInfo ; /* ResultRelInfo for 'relid' */
BulkInsertState bistate ; /* BulkInsertState for this rel */
BulkInsertState bistate ; /* BulkInsertState for this rel if plain
* table ; NULL if foreign table */
int nused ; /* number of 'slots' containing tuples */
int nused ; /* number of 'slots' containing tuples */
uint64 linenos [ MAX_BUFFERED_TUPLES ] ; /* Line # of tuple in copy
uint64 linenos [ MAX_BUFFERED_TUPLES ] ; /* Line # of tuple in copy
* stream */
* stream */
@ -116,6 +117,12 @@ CopyFromErrorCallback(void *arg)
{
{
CopyFromState cstate = ( CopyFromState ) arg ;
CopyFromState cstate = ( CopyFromState ) arg ;
if ( cstate - > relname_only )
{
errcontext ( " COPY %s " ,
cstate - > cur_relname ) ;
return ;
}
if ( cstate - > opts . binary )
if ( cstate - > opts . binary )
{
{
/* can't usefully display the data */
/* can't usefully display the data */
@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
buffer = ( CopyMultiInsertBuffer * ) palloc ( sizeof ( CopyMultiInsertBuffer ) ) ;
buffer = ( CopyMultiInsertBuffer * ) palloc ( sizeof ( CopyMultiInsertBuffer ) ) ;
memset ( buffer - > slots , 0 , sizeof ( TupleTableSlot * ) * MAX_BUFFERED_TUPLES ) ;
memset ( buffer - > slots , 0 , sizeof ( TupleTableSlot * ) * MAX_BUFFERED_TUPLES ) ;
buffer - > resultRelInfo = rri ;
buffer - > resultRelInfo = rri ;
buffer - > bistate = GetBulkInsertState ( ) ;
buffer - > bistate = ( rri - > ri_FdwRoutine = = NULL ) ? GetBulkInsertState ( ) : NULL ;
buffer - > nused = 0 ;
buffer - > nused = 0 ;
return buffer ;
return buffer ;
@ -299,26 +306,107 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
*/
*/
static inline void
static inline void
CopyMultiInsertBufferFlush ( CopyMultiInsertInfo * miinfo ,
CopyMultiInsertBufferFlush ( CopyMultiInsertInfo * miinfo ,
CopyMultiInsertBuffer * buffer )
CopyMultiInsertBuffer * buffer ,
int64 * processed )
{
{
MemoryContext oldcontext ;
int i ;
uint64 save_cur_lineno ;
CopyFromState cstate = miinfo - > cstate ;
CopyFromState cstate = miinfo - > cstate ;
EState * estate = miinfo - > estate ;
EState * estate = miinfo - > estate ;
CommandId mycid = miinfo - > mycid ;
int ti_options = miinfo - > ti_options ;
bool line_buf_valid = cstate - > line_buf_valid ;
int nused = buffer - > nused ;
int nused = buffer - > nused ;
ResultRelInfo * resultRelInfo = buffer - > resultRelInfo ;
ResultRelInfo * resultRelInfo = buffer - > resultRelInfo ;
TupleTableSlot * * slots = buffer - > slots ;
TupleTableSlot * * slots = buffer - > slots ;
int i ;
if ( resultRelInfo - > ri_FdwRoutine )
{
int batch_size = resultRelInfo - > ri_BatchSize ;
int sent = 0 ;
Assert ( buffer - > bistate = = NULL ) ;
/* Ensure that the FDW supports batching and it's enabled */
Assert ( resultRelInfo - > ri_FdwRoutine - > ExecForeignBatchInsert ) ;
Assert ( batch_size > 1 ) ;
/*
* We suppress error context information other than the relation name ,
* if one of the operations below fails .
*/
Assert ( ! cstate - > relname_only ) ;
cstate - > relname_only = true ;
while ( sent < nused )
{
int size = ( batch_size < nused - sent ) ? batch_size : ( nused - sent ) ;
int inserted = size ;
TupleTableSlot * * rslots ;
/* insert into foreign table: let the FDW do it */
rslots =
resultRelInfo - > ri_FdwRoutine - > ExecForeignBatchInsert ( estate ,
resultRelInfo ,
& slots [ sent ] ,
NULL ,
& inserted ) ;
sent + = size ;
/* No need to do anything if there are no inserted rows */
if ( inserted < = 0 )
continue ;
/* Triggers on foreign tables should not have transition tables */
Assert ( resultRelInfo - > ri_TrigDesc = = NULL | |
resultRelInfo - > ri_TrigDesc - > trig_insert_new_table = = false ) ;
/* Run AFTER ROW INSERT triggers */
if ( resultRelInfo - > ri_TrigDesc ! = NULL & &
resultRelInfo - > ri_TrigDesc - > trig_insert_after_row )
{
Oid relid = RelationGetRelid ( resultRelInfo - > ri_RelationDesc ) ;
for ( i = 0 ; i < inserted ; i + + )
{
TupleTableSlot * slot = rslots [ i ] ;
/*
* AFTER ROW Triggers might reference the tableoid column ,
* so ( re - ) initialize tts_tableOid before evaluating them .
*/
slot - > tts_tableOid = relid ;
ExecARInsertTriggers ( estate , resultRelInfo ,
slot , NIL ,
cstate - > transition_capture ) ;
}
}
/* Update the row counter and progress of the COPY command */
* processed + = inserted ;
pgstat_progress_update_param ( PROGRESS_COPY_TUPLES_PROCESSED ,
* processed ) ;
}
for ( i = 0 ; i < nused ; i + + )
ExecClearTuple ( slots [ i ] ) ;
/* reset relname_only */
cstate - > relname_only = false ;
}
else
{
CommandId mycid = miinfo - > mycid ;
int ti_options = miinfo - > ti_options ;
bool line_buf_valid = cstate - > line_buf_valid ;
uint64 save_cur_lineno = cstate - > cur_lineno ;
MemoryContext oldcontext ;
Assert ( buffer - > bistate ! = NULL ) ;
/*
/*
* Print error context information correctly , if one of the operations
* Print error context information correctly , if one of the operations
* below fails .
* below fails .
*/
*/
cstate - > line_buf_valid = false ;
cstate - > line_buf_valid = false ;
save_cur_lineno = cstate - > cur_lineno ;
/*
/*
* table_multi_insert may leak memory , so switch to short - lived memory
* table_multi_insert may leak memory , so switch to short - lived memory
@ -336,8 +424,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
for ( i = 0 ; i < nused ; i + + )
for ( i = 0 ; i < nused ; i + + )
{
{
/*
/*
* If there are any indexes , update them for all the inserted tuples ,
* If there are any indexes , update them for all the inserted
* and run AFTER ROW INSERT triggers .
* tuples , and run AFTER ROW INSERT triggers .
*/
*/
if ( resultRelInfo - > ri_NumIndices > 0 )
if ( resultRelInfo - > ri_NumIndices > 0 )
{
{
@ -346,8 +434,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
cstate - > cur_lineno = buffer - > linenos [ i ] ;
cstate - > cur_lineno = buffer - > linenos [ i ] ;
recheckIndexes =
recheckIndexes =
ExecInsertIndexTuples ( resultRelInfo ,
ExecInsertIndexTuples ( resultRelInfo ,
buffer - > slots [ i ] , estate , fals e , false ,
buffer - > slots [ i ] , estate , false ,
NULL , NIL ) ;
false , NULL , NIL ) ;
ExecARInsertTriggers ( estate , resultRelInfo ,
ExecARInsertTriggers ( estate , resultRelInfo ,
slots [ i ] , recheckIndexes ,
slots [ i ] , recheckIndexes ,
cstate - > transition_capture ) ;
cstate - > transition_capture ) ;
@ -364,18 +452,25 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
{
{
cstate - > cur_lineno = buffer - > linenos [ i ] ;
cstate - > cur_lineno = buffer - > linenos [ i ] ;
ExecARInsertTriggers ( estate , resultRelInfo ,
ExecARInsertTriggers ( estate , resultRelInfo ,
slots [ i ] , NIL , cstate - > transition_capture ) ;
slots [ i ] , NIL ,
cstate - > transition_capture ) ;
}
}
ExecClearTuple ( slots [ i ] ) ;
ExecClearTuple ( slots [ i ] ) ;
}
}
/* Mark that all slots are free */
/* Update the row counter and progress of the COPY command */
buffer - > nused = 0 ;
* processed + = nused ;
pgstat_progress_update_param ( PROGRESS_COPY_TUPLES_PROCESSED ,
* processed ) ;
/* reset cur_lineno and line_buf_valid to what they were */
/* reset cur_lineno and line_buf_valid to what they were */
cstate - > line_buf_valid = line_buf_valid ;
cstate - > line_buf_valid = line_buf_valid ;
cstate - > cur_lineno = save_cur_lineno ;
cstate - > cur_lineno = save_cur_lineno ;
}
/* Mark that all slots are free */
buffer - > nused = 0 ;
}
}
/*
/*
@ -387,21 +482,29 @@ static inline void
CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo * miinfo ,
CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo * miinfo ,
CopyMultiInsertBuffer * buffer )
CopyMultiInsertBuffer * buffer )
{
{
ResultRelInfo * resultRelInfo = buffer - > resultRelInfo ;
int i ;
int i ;
/* Ensure buffer was flushed */
/* Ensure buffer was flushed */
Assert ( buffer - > nused = = 0 ) ;
Assert ( buffer - > nused = = 0 ) ;
/* Remove back-link to ourself */
/* Remove back-link to ourself */
buffer - > resultRelInfo - > ri_CopyMultiInsertBuffer = NULL ;
resultRelInfo - > ri_CopyMultiInsertBuffer = NULL ;
if ( resultRelInfo - > ri_FdwRoutine = = NULL )
{
Assert ( buffer - > bistate ! = NULL ) ;
FreeBulkInsertState ( buffer - > bistate ) ;
FreeBulkInsertState ( buffer - > bistate ) ;
}
else
Assert ( buffer - > bistate = = NULL ) ;
/* Since we only create slots on demand, just drop the non-null ones. */
/* Since we only create slots on demand, just drop the non-null ones. */
for ( i = 0 ; i < MAX_BUFFERED_TUPLES & & buffer - > slots [ i ] ! = NULL ; i + + )
for ( i = 0 ; i < MAX_BUFFERED_TUPLES & & buffer - > slots [ i ] ! = NULL ; i + + )
ExecDropSingleTupleTableSlot ( buffer - > slots [ i ] ) ;
ExecDropSingleTupleTableSlot ( buffer - > slots [ i ] ) ;
table_finish_bulk_insert ( buffer - > resultRelInfo - > ri_RelationDesc ,
if ( resultRelInfo - > ri_FdwRoutine = = NULL )
table_finish_bulk_insert ( resultRelInfo - > ri_RelationDesc ,
miinfo - > ti_options ) ;
miinfo - > ti_options ) ;
pfree ( buffer ) ;
pfree ( buffer ) ;
@ -418,7 +521,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
* ' curr_rri ' .
* ' curr_rri ' .
*/
*/
static inline void
static inline void
CopyMultiInsertInfoFlush ( CopyMultiInsertInfo * miinfo , ResultRelInfo * curr_rri )
CopyMultiInsertInfoFlush ( CopyMultiInsertInfo * miinfo , ResultRelInfo * curr_rri ,
int64 * processed )
{
{
ListCell * lc ;
ListCell * lc ;
@ -426,7 +530,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
{
{
CopyMultiInsertBuffer * buffer = ( CopyMultiInsertBuffer * ) lfirst ( lc ) ;
CopyMultiInsertBuffer * buffer = ( CopyMultiInsertBuffer * ) lfirst ( lc ) ;
CopyMultiInsertBufferFlush ( miinfo , buffer ) ;
CopyMultiInsertBufferFlush ( miinfo , buffer , processed ) ;
}
}
miinfo - > bufferedTuples = 0 ;
miinfo - > bufferedTuples = 0 ;
@ -679,6 +783,23 @@ CopyFrom(CopyFromState cstate)
resultRelInfo - > ri_FdwRoutine - > BeginForeignInsert ( mtstate ,
resultRelInfo - > ri_FdwRoutine - > BeginForeignInsert ( mtstate ,
resultRelInfo ) ;
resultRelInfo ) ;
/*
* Also , if the named relation is a foreign table , determine if the FDW
* supports batch insert and determine the batch size ( a FDW may support
* batching , but it may be disabled for the server / table ) .
*
* If the FDW does not support batching , we set the batch size to 1.
*/
if ( resultRelInfo - > ri_FdwRoutine ! = NULL & &
resultRelInfo - > ri_FdwRoutine - > GetForeignModifyBatchSize & &
resultRelInfo - > ri_FdwRoutine - > ExecForeignBatchInsert )
resultRelInfo - > ri_BatchSize =
resultRelInfo - > ri_FdwRoutine - > GetForeignModifyBatchSize ( resultRelInfo ) ;
else
resultRelInfo - > ri_BatchSize = 1 ;
Assert ( resultRelInfo - > ri_BatchSize > = 1 ) ;
/* Prepare to catch AFTER triggers. */
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery ( ) ;
AfterTriggerBeginQuery ( ) ;
@ -708,10 +829,11 @@ CopyFrom(CopyFromState cstate)
/*
/*
* It ' s generally more efficient to prepare a bunch of tuples for
* It ' s generally more efficient to prepare a bunch of tuples for
* insertion , and insert them in one table_multi_insert ( ) call , than call
* insertion , and insert them in one
* table_tuple_insert ( ) separately for every tuple . However , there are a
* table_multi_insert ( ) / ExecForeignBatchInsert ( ) call , than call
* number of reasons why we might not be able to do this . These are
* table_tuple_insert ( ) / ExecForeignInsert ( ) separately for every tuple .
* explained below .
* However , there are a number of reasons why we might not be able to do
* this . These are explained below .
*/
*/
if ( resultRelInfo - > ri_TrigDesc ! = NULL & &
if ( resultRelInfo - > ri_TrigDesc ! = NULL & &
( resultRelInfo - > ri_TrigDesc - > trig_insert_before_row | |
( resultRelInfo - > ri_TrigDesc - > trig_insert_before_row | |
@ -725,6 +847,15 @@ CopyFrom(CopyFromState cstate)
*/
*/
insertMethod = CIM_SINGLE ;
insertMethod = CIM_SINGLE ;
}
}
else if ( resultRelInfo - > ri_FdwRoutine ! = NULL & &
resultRelInfo - > ri_BatchSize = = 1 )
{
/*
* Can ' t support multi - inserts to a foreign table if the FDW does not
* support batching , or it ' s disabled for the server or foreign table .
*/
insertMethod = CIM_SINGLE ;
}
else if ( proute ! = NULL & & resultRelInfo - > ri_TrigDesc ! = NULL & &
else if ( proute ! = NULL & & resultRelInfo - > ri_TrigDesc ! = NULL & &
resultRelInfo - > ri_TrigDesc - > trig_insert_new_table )
resultRelInfo - > ri_TrigDesc - > trig_insert_new_table )
{
{
@ -737,14 +868,12 @@ CopyFrom(CopyFromState cstate)
*/
*/
insertMethod = CIM_SINGLE ;
insertMethod = CIM_SINGLE ;
}
}
else if ( resultRelInfo - > ri_FdwRoutine ! = NULL | |
else if ( cstate - > volatile_defexprs )
cstate - > volatile_defexprs )
{
{
/*
/*
* Can ' t support multi - inserts to foreign tables or if there are any
* Can ' t support multi - inserts if there are any volatile default
* volatile default expressions in the table . Similarly to the
* expressions in the table . Similarly to the trigger case above ,
* trigger case above , such expressions may query the table we ' re
* such expressions may query the table we ' re inserting into .
* inserting into .
*
*
* Note : It does not matter if any partitions have any volatile
* Note : It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
* default expressions as we use the defaults from the target of the
@ -767,13 +896,14 @@ CopyFrom(CopyFromState cstate)
* For partitioned tables , we may still be able to perform bulk
* For partitioned tables , we may still be able to perform bulk
* inserts . However , the possibility of this depends on which types
* inserts . However , the possibility of this depends on which types
* of triggers exist on the partition . We must disable bulk inserts
* of triggers exist on the partition . We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert
* if the partition is a foreign table that can ' t use batching or it
* or insert instead triggers ( same as we checked above for the parent
* has any before row insert or insert instead triggers ( same as we
* table ) . Since the partition ' s resultRelInfos are initialized only
* checked above for the parent table ) . Since the partition ' s
* when we actually need to insert the first tuple into them , we must
* resultRelInfos are initialized only when we actually need to insert
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* the first tuple into them , we must have the intermediate insert
* flag that we must later determine if we can use bulk - inserts for
* method of CIM_MULTI_CONDITIONAL to flag that we must later
* the partition being inserted into .
* determine if we can use bulk - inserts for the partition being
* inserted into .
*/
*/
if ( proute )
if ( proute )
insertMethod = CIM_MULTI_CONDITIONAL ;
insertMethod = CIM_MULTI_CONDITIONAL ;
@ -910,12 +1040,14 @@ CopyFrom(CopyFromState cstate)
/*
/*
* Disable multi - inserts when the partition has BEFORE / INSTEAD
* Disable multi - inserts when the partition has BEFORE / INSTEAD
* OF triggers , or if the partition is a foreign partition .
* OF triggers , or if the partition is a foreign table that
* can ' t use batching .
*/
*/
leafpart_use_multi_insert = insertMethod = = CIM_MULTI_CONDITIONAL & &
leafpart_use_multi_insert = insertMethod = = CIM_MULTI_CONDITIONAL & &
! has_before_insert_row_trig & &
! has_before_insert_row_trig & &
! has_instead_insert_row_trig & &
! has_instead_insert_row_trig & &
resultRelInfo - > ri_FdwRoutine = = NULL ;
( resultRelInfo - > ri_FdwRoutine = = NULL | |
resultRelInfo - > ri_BatchSize > 1 ) ;
/* Set the multi-insert buffer to use for this partition. */
/* Set the multi-insert buffer to use for this partition. */
if ( leafpart_use_multi_insert )
if ( leafpart_use_multi_insert )
@ -931,7 +1063,9 @@ CopyFrom(CopyFromState cstate)
* Flush pending inserts if this partition can ' t use
* Flush pending inserts if this partition can ' t use
* batching , so rows are visible to triggers etc .
* batching , so rows are visible to triggers etc .
*/
*/
CopyMultiInsertInfoFlush ( & multiInsertInfo , resultRelInfo ) ;
CopyMultiInsertInfoFlush ( & multiInsertInfo ,
resultRelInfo ,
& processed ) ;
}
}
if ( bistate ! = NULL )
if ( bistate ! = NULL )
@ -1067,7 +1201,17 @@ CopyFrom(CopyFromState cstate)
* buffers out to their tables .
* buffers out to their tables .
*/
*/
if ( CopyMultiInsertInfoIsFull ( & multiInsertInfo ) )
if ( CopyMultiInsertInfoIsFull ( & multiInsertInfo ) )
CopyMultiInsertInfoFlush ( & multiInsertInfo , resultRelInfo ) ;
CopyMultiInsertInfoFlush ( & multiInsertInfo ,
resultRelInfo ,
& processed ) ;
/*
* We delay updating the row counter and progress of the
* COPY command until after writing the tuples stored in
* the buffer out to the table , as in single insert mode .
* See CopyMultiInsertBufferFlush ( ) .
*/
continue ; /* next tuple please */
}
}
else
else
{
{
@ -1130,7 +1274,7 @@ CopyFrom(CopyFromState cstate)
if ( insertMethod ! = CIM_SINGLE )
if ( insertMethod ! = CIM_SINGLE )
{
{
if ( ! CopyMultiInsertInfoIsEmpty ( & multiInsertInfo ) )
if ( ! CopyMultiInsertInfoIsEmpty ( & multiInsertInfo ) )
CopyMultiInsertInfoFlush ( & multiInsertInfo , NULL ) ;
CopyMultiInsertInfoFlush ( & multiInsertInfo , NULL , & processed ) ;
}
}
/* Done, clean up */
/* Done, clean up */
@ -1348,6 +1492,7 @@ BeginCopyFrom(ParseState *pstate,
cstate - > cur_lineno = 0 ;
cstate - > cur_lineno = 0 ;
cstate - > cur_attname = NULL ;
cstate - > cur_attname = NULL ;
cstate - > cur_attval = NULL ;
cstate - > cur_attval = NULL ;
cstate - > relname_only = false ;
/*
/*
* Allocate buffers for the input pipeline .
* Allocate buffers for the input pipeline .