@ -47,6 +47,7 @@
# include "utils/builtins.h"
# include "utils/lsyscache.h"
# include "utils/memutils.h"
# include "utils/partcache.h"
# include "utils/portal.h"
# include "utils/rel.h"
# include "utils/rls.h"
@ -79,6 +80,16 @@ typedef enum EolType
EOL_CRNL
} EolType ;
/*
* Represents the heap insert method to be used during COPY FROM .
*/
typedef enum CopyInsertMethod
{
CIM_SINGLE , /* use heap_insert or fdw routine */
CIM_MULTI , /* always use heap_multi_insert */
CIM_MULTI_CONDITIONAL /* use heap_multi_insert only if valid */
} CopyInsertMethod ;
/*
* This struct contains all the state variables used throughout a COPY
* operation . For simplicity , we use the same struct for all variants of COPY ,
@ -168,9 +179,6 @@ typedef struct CopyStateData
bool volatile_defexprs ; /* is any of defexprs volatile? */
List * range_table ;
/* Tuple-routing support info */
PartitionTupleRouting * partition_tuple_routing ;
TransitionCaptureState * transition_capture ;
/*
@ -2305,26 +2313,35 @@ CopyFrom(CopyState cstate)
Datum * values ;
bool * nulls ;
ResultRelInfo * resultRelInfo ;
ResultRelInfo * saved_resultRelInfo = NULL ;
ResultRelInfo * target_resultRelInfo ;
EState * estate = CreateExecutorState ( ) ; /* for ExecConstraints() */
ModifyTableState * mtstate ;
ExprContext * econtext ;
TupleTableSlot * myslot ;
MemoryContext oldcontext = CurrentMemoryContext ;
PartitionTupleRouting * proute = NULL ;
ExprContext * secondaryExprContext = NULL ;
ErrorContextCallback errcallback ;
CommandId mycid = GetCurrentCommandId ( true ) ;
int hi_options = 0 ; /* start with default heap_insert options */
BulkInsertState bistate ;
CopyInsertMethod insertMethod ;
uint64 processed = 0 ;
bool useHeapMultiInsert ;
int nBufferedTuples = 0 ;
int prev_leaf_part_index = - 1 ;
bool has_before_insert_row_trig ;
bool has_instead_insert_row_trig ;
bool leafpart_use_multi_insert = false ;
# define MAX_BUFFERED_TUPLES 1000
# define RECHECK_MULTI_INSERT_THRESHOLD 1000
HeapTuple * bufferedTuples = NULL ; /* initialize to silence warning */
Size bufferedTuplesSize = 0 ;
uint64 firstBufferedLineNo = 0 ;
uint64 lastPartitionSampleLineNo = 0 ;
uint64 nPartitionChanges = 0 ;
double avgTuplesPerPartChange = 0 ;
Assert ( cstate - > rel ) ;
@ -2455,6 +2472,7 @@ CopyFrom(CopyState cstate)
1 , /* dummy rangetable index */
NULL ,
0 ) ;
target_resultRelInfo = resultRelInfo ;
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel ( resultRelInfo , CMD_INSERT ) ;
@ -2504,10 +2522,7 @@ CopyFrom(CopyState cstate)
*/
if ( cstate - > rel - > rd_rel - > relkind = = RELKIND_PARTITIONED_TABLE )
{
PartitionTupleRouting * proute ;
proute = cstate - > partition_tuple_routing =
ExecSetupPartitionTupleRouting ( NULL , cstate - > rel ) ;
proute = ExecSetupPartitionTupleRouting ( NULL , cstate - > rel ) ;
/*
* If we are capturing transition tuples , they may need to be
@ -2522,28 +2537,92 @@ CopyFrom(CopyState cstate)
/*
* It ' s more efficient to prepare a bunch of tuples for insertion , and
* insert them in one heap_multi_insert ( ) call , than call heap_insert ( )
* separately for every tuple . However , we can ' t do that if there are
* BEFORE / INSTEAD OF triggers , or we need to evaluate volatile default
* expressions . Such triggers or expressions might query the table we ' re
* inserting to , and act differently if the tuples that have already been
* processed and prepared for insertion are not there . We also can ' t do
* it if the table is foreign or partitioned .
* separately for every tuple . However , there are a number of reasons why
* we might not be able to do this . These are explained below .
*/
if ( ( resultRelInfo - > ri_TrigDesc ! = NULL & &
( resultRelInfo - > ri_TrigDesc - > trig_insert_before_row | |
resultRelInfo - > ri_TrigDesc - > trig_insert_instead_row ) ) | |
resultRelInfo - > ri_FdwRoutine ! = NULL | |
cstate - > partition_tuple_routing ! = NULL | |
cstate - > volatile_defexprs )
if ( resultRelInfo - > ri_TrigDesc ! = NULL & &
( resultRelInfo - > ri_TrigDesc - > trig_insert_before_row | |
resultRelInfo - > ri_TrigDesc - > trig_insert_instead_row ) )
{
/*
* Can ' t support multi - inserts when there are any BEFORE / INSTEAD OF
* triggers on the table . Such triggers might query the table we ' re
* inserting into and act differently if the tuples that have already
* been processed and prepared for insertion are not there .
*/
insertMethod = CIM_SINGLE ;
}
else if ( proute ! = NULL & & resultRelInfo - > ri_TrigDesc ! = NULL & &
resultRelInfo - > ri_TrigDesc - > trig_insert_new_table )
{
/*
* For partitioned tables we can ' t support multi - inserts when there
* are any statement level insert triggers . It might be possible to
* allow partitioned tables with such triggers in the future , but for
* now , CopyFromInsertBatch expects that any before row insert and
* statement level insert triggers are on the same relation .
*/
insertMethod = CIM_SINGLE ;
}
else if ( resultRelInfo - > ri_FdwRoutine ! = NULL | |
cstate - > volatile_defexprs )
{
useHeapMultiInsert = false ;
/*
* Can ' t support multi - inserts to foreign tables or if there are any
* volatile default expressions in the table . Similarly to the
* trigger case above , such expressions may query the table we ' re
* inserting into .
*
* Note : It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
* COPY command .
*/
insertMethod = CIM_SINGLE ;
}
else
{
useHeapMultiInsert = true ;
/*
* For partitioned tables , we may still be able to perform bulk
* inserts for sets of consecutive tuples which belong to the same
* partition . However , the possibility of this depends on which types
* of triggers exist on the partition . We must disable bulk inserts
* if the partition is a foreign table or it has any before row insert
* or insert instead triggers ( same as we checked above for the parent
* table ) . Since the partition ' s resultRelInfos are initialized only
* when we actually need to insert the first tuple into them , we must
* have the intermediate insert method of CIM_MULTI_CONDITIONAL to
* flag that we must later determine if we can use bulk - inserts for
* the partition being inserted into .
*
* Normally , when performing bulk inserts we just flush the insert
* buffer whenever it becomes full , but for the partitioned table
* case , we flush it whenever the current tuple does not belong to the
* same partition as the previous tuple , and since we flush the
* previous partition ' s buffer once the new tuple has already been
* built , we ' re unable to reset the estate since we ' d free the memory
* in which the new tuple is stored . To work around this we maintain
* a secondary expression context and alternate between these when the
* partition changes . This does mean we do store the first new tuple
* in a different context than subsequent tuples , but that does not
* matter , providing we don ' t free anything while it ' s still needed .
*/
if ( proute )
{
insertMethod = CIM_MULTI_CONDITIONAL ;
secondaryExprContext = CreateExprContext ( estate ) ;
}
else
insertMethod = CIM_MULTI ;
bufferedTuples = palloc ( MAX_BUFFERED_TUPLES * sizeof ( HeapTuple ) ) ;
}
has_before_insert_row_trig = ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_before_row ) ;
has_instead_insert_row_trig = ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_instead_row ) ;
/*
* Check BEFORE STATEMENT insertion triggers . It ' s debatable whether we
* should do this for COPY , since it ' s not really an " INSERT " statement as
@ -2598,7 +2677,7 @@ CopyFrom(CopyState cstate)
* Constraints might reference the tableoid column , so initialize
* t_tableOid before evaluating them .
*/
tuple - > t_tableOid = RelationGetRelid ( resultRelInfo - > ri_RelationDesc ) ;
tuple - > t_tableOid = RelationGetRelid ( target_ resultRelInfo- > ri_RelationDesc ) ;
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo ( oldcontext ) ;
@ -2608,10 +2687,9 @@ CopyFrom(CopyState cstate)
ExecStoreTuple ( tuple , slot , InvalidBuffer , false ) ;
/* Determine the partition to heap_insert the tuple into */
if ( cstate - > partition_tuple_routing )
if ( proute )
{
int leaf_part_index ;
PartitionTupleRouting * proute = cstate - > partition_tuple_routing ;
/*
* Away we go . . . If we end up not finding a partition after all ,
@ -2621,39 +2699,132 @@ CopyFrom(CopyState cstate)
* will get us the ResultRelInfo and TupleConversionMap for the
* partition , respectively .
*/
leaf_part_index = ExecFindPartition ( resultRelInfo ,
leaf_part_index = ExecFindPartition ( target_ resultRelInfo,
proute - > partition_dispatch_info ,
slot ,
estate ) ;
Assert ( leaf_part_index > = 0 & &
leaf_part_index < proute - > num_partitions ) ;
/*
* If this tuple is mapped to a partition that is not same as the
* previous one , we ' d better make the bulk insert mechanism gets a
* new buffer .
*/
if ( prev_leaf_part_index ! = leaf_part_index )
{
/* Check if we can multi-insert into this partition */
if ( insertMethod = = CIM_MULTI_CONDITIONAL )
{
/*
* When performing bulk - inserts into partitioned tables we
* must insert the tuples seen so far to the heap whenever
* the partition changes .
*/
if ( nBufferedTuples > 0 )
{
ExprContext * swapcontext ;
ResultRelInfo * presultRelInfo ;
presultRelInfo = proute - > partitions [ prev_leaf_part_index ] ;
CopyFromInsertBatch ( cstate , estate , mycid , hi_options ,
presultRelInfo , myslot , bistate ,
nBufferedTuples , bufferedTuples ,
firstBufferedLineNo ) ;
nBufferedTuples = 0 ;
bufferedTuplesSize = 0 ;
Assert ( secondaryExprContext ) ;
/*
* Normally we reset the per - tuple context whenever
* the bufferedTuples array is empty at the beginning
* of the loop , however , it is possible since we flush
* the buffer here that the buffer is never empty at
* the start of the loop . To prevent the per - tuple
* context from never being reset we maintain a second
* context and alternate between them when the
* partition changes . We can now reset
* secondaryExprContext as this is no longer needed ,
* since we just flushed any tuples stored in it . We
* also now switch over to the other context . This
* does mean that the first tuple in the buffer won ' t
* be in the same context as the others , but that does
* not matter since we only reset it after the flush .
*/
ReScanExprContext ( secondaryExprContext ) ;
swapcontext = secondaryExprContext ;
secondaryExprContext = estate - > es_per_tuple_exprcontext ;
estate - > es_per_tuple_exprcontext = swapcontext ;
}
nPartitionChanges + + ;
/*
* Here we adaptively enable multi - inserts based on the
* average number of tuples from recent multi - insert
* batches . We recalculate the average every
* RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
* the average over the whole copy . This allows us to
* enable multi - inserts when we get periods in the copy
* stream that have tuples commonly belonging to the same
* partition , and disable when the partition is changing
* too often .
*/
if ( unlikely ( lastPartitionSampleLineNo < = ( cstate - > cur_lineno -
RECHECK_MULTI_INSERT_THRESHOLD )
& & cstate - > cur_lineno > = RECHECK_MULTI_INSERT_THRESHOLD ) )
{
avgTuplesPerPartChange =
( cstate - > cur_lineno - lastPartitionSampleLineNo ) /
( double ) nPartitionChanges ;
lastPartitionSampleLineNo = cstate - > cur_lineno ;
nPartitionChanges = 0 ;
}
/*
* Tests have shown that using multi - inserts when the
* partition changes on every tuple slightly decreases the
* performance , however , there are benefits even when only
* some batches have just 2 tuples , so let ' s enable
* multi - inserts even when the average is quite low .
*/
leafpart_use_multi_insert = avgTuplesPerPartChange > = 1.3 & &
! has_before_insert_row_trig & &
! has_instead_insert_row_trig & &
resultRelInfo - > ri_FdwRoutine = = NULL ;
}
else
leafpart_use_multi_insert = false ;
/*
* Overwrite resultRelInfo with the corresponding partition ' s
* one .
*/
resultRelInfo = proute - > partitions [ leaf_part_index ] ;
if ( unlikely ( resultRelInfo = = NULL ) )
{
resultRelInfo = ExecInitPartitionInfo ( mtstate ,
target_resultRelInfo ,
proute , estate ,
leaf_part_index ) ;
proute - > partitions [ leaf_part_index ] = resultRelInfo ;
Assert ( resultRelInfo ! = NULL ) ;
}
/* Determine which triggers exist on this partition */
has_before_insert_row_trig = ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_before_row ) ;
has_instead_insert_row_trig = ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_instead_row ) ;
/*
* We ' d better make the bulk insert mechanism gets a new
* buffer when the partition being inserted into changes .
*/
ReleaseBulkInsertStatePin ( bistate ) ;
prev_leaf_part_index = leaf_part_index ;
}
/*
* Save the old ResultRelInfo and switch to the one corresponding
* to the selected partition .
*/
saved_resultRelInfo = resultRelInfo ;
resultRelInfo = proute - > partitions [ leaf_part_index ] ;
if ( resultRelInfo = = NULL )
{
resultRelInfo = ExecInitPartitionInfo ( mtstate ,
saved_resultRelInfo ,
proute , estate ,
leaf_part_index ) ;
Assert ( resultRelInfo ! = NULL ) ;
}
/*
* For ExecInsertIndexTuples ( ) to work on the partition ' s indexes
*/
@ -2665,8 +2836,7 @@ CopyFrom(CopyState cstate)
*/
if ( cstate - > transition_capture ! = NULL )
{
if ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_before_row )
if ( has_before_insert_row_trig )
{
/*
* If there are any BEFORE triggers on the partition ,
@ -2675,7 +2845,7 @@ CopyFrom(CopyState cstate)
*/
cstate - > transition_capture - > tcs_original_insert_tuple = NULL ;
cstate - > transition_capture - > tcs_map =
TupConvMapForLeaf ( proute , saved _resultRelInfo,
TupConvMapForLeaf ( proute , target _resultRelInfo,
leaf_part_index ) ;
}
else
@ -2691,12 +2861,14 @@ CopyFrom(CopyState cstate)
/*
* We might need to convert from the parent rowtype to the
* partition rowtype .
* partition rowtype . Don ' t free the already stored tuple as it
* may still be required for a multi - insert batch .
*/
tuple = ConvertPartitionTupleSlot ( proute - > parent_child_tupconv_maps [ leaf_part_index ] ,
tuple ,
proute - > partition_tuple_slot ,
& slot ) ;
& slot ,
false ) ;
tuple - > t_tableOid = RelationGetRelid ( resultRelInfo - > ri_RelationDesc ) ;
}
@ -2704,8 +2876,7 @@ CopyFrom(CopyState cstate)
skip_tuple = false ;
/* BEFORE ROW INSERT Triggers */
if ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_before_row )
if ( has_before_insert_row_trig )
{
slot = ExecBRInsertTriggers ( estate , resultRelInfo , slot ) ;
@ -2717,8 +2888,7 @@ CopyFrom(CopyState cstate)
if ( ! skip_tuple )
{
if ( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_instead_row )
if ( has_instead_insert_row_trig )
{
/* Pass the data to the INSTEAD ROW INSERT trigger */
ExecIRInsertTriggers ( estate , resultRelInfo , slot ) ;
@ -2740,12 +2910,15 @@ CopyFrom(CopyState cstate)
* partition .
*/
if ( resultRelInfo - > ri_PartitionCheck & &
( saved_resultRelInfo = = NULL | |
( resultRelInfo - > ri_TrigDesc & &
resultRelInfo - > ri_TrigDesc - > trig_insert_before_row ) ) )
( proute = = NULL | | has_before_insert_row_trig ) )
ExecPartitionCheck ( resultRelInfo , slot , estate , true ) ;
if ( useHeapMultiInsert )
/*
* Perform multi - inserts when enabled , or when loading a
* partitioned table that can support multi - inserts as
* determined above .
*/
if ( insertMethod = = CIM_MULTI | | leafpart_use_multi_insert )
{
/* Add this tuple to the tuple buffer */
if ( nBufferedTuples = = 0 )
@ -2783,7 +2956,7 @@ CopyFrom(CopyState cstate)
NULL ) ;
if ( slot = = NULL ) /* "do nothing" */
goto next_tuple ;
continue ; /* next tuple please */
/* FDW might have changed tuple */
tuple = ExecMaterializeSlot ( slot ) ;
@ -2823,22 +2996,28 @@ CopyFrom(CopyState cstate)
*/
processed + + ;
}
next_tuple :
/* Restore the saved ResultRelInfo */
if ( saved_resultRelInfo )
{
resultRelInfo = saved_resultRelInfo ;
estate - > es_result_relation_info = resultRelInfo ;
}
}
/* Flush any remaining buffered tuples */
if ( nBufferedTuples > 0 )
CopyFromInsertBatch ( cstate , estate , mycid , hi_options ,
resultRelInfo , myslot , bistate ,
nBufferedTuples , bufferedTuples ,
firstBufferedLineNo ) ;
{
if ( insertMethod = = CIM_MULTI_CONDITIONAL )
{
ResultRelInfo * presultRelInfo ;
presultRelInfo = proute - > partitions [ prev_leaf_part_index ] ;
CopyFromInsertBatch ( cstate , estate , mycid , hi_options ,
presultRelInfo , myslot , bistate ,
nBufferedTuples , bufferedTuples ,
firstBufferedLineNo ) ;
}
else
CopyFromInsertBatch ( cstate , estate , mycid , hi_options ,
resultRelInfo , myslot , bistate ,
nBufferedTuples , bufferedTuples ,
firstBufferedLineNo ) ;
}
/* Done, clean up */
error_context_stack = errcallback . previous ;
@ -2855,7 +3034,7 @@ next_tuple:
pq_endmsgread ( ) ;
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers ( estate , resultRelInfo , cstate - > transition_capture ) ;
ExecASInsertTriggers ( estate , target_ resultRelInfo, cstate - > transition_capture ) ;
/* Handle queued AFTER triggers */
AfterTriggerEndQuery ( estate ) ;
@ -2866,16 +3045,16 @@ next_tuple:
ExecResetTupleTable ( estate - > es_tupleTable , false ) ;
/* Allow the FDW to shut down */
if ( resultRelInfo - > ri_FdwRoutine ! = NULL & &
resultRelInfo - > ri_FdwRoutine - > EndForeignInsert ! = NULL )
resultRelInfo - > ri_FdwRoutine - > EndForeignInsert ( estate ,
resultRelInfo ) ;
if ( target_ resultRelInfo- > ri_FdwRoutine ! = NULL & &
target_ resultRelInfo- > ri_FdwRoutine - > EndForeignInsert ! = NULL )
target_ resultRelInfo- > ri_FdwRoutine - > EndForeignInsert ( estate ,
target_ resultRelInfo ) ;
ExecCloseIndices ( resultRelInfo ) ;
ExecCloseIndices ( target_ resultRelInfo) ;
/* Close all the partitioned tables, leaf partitions, and their indices */
if ( cstate - > partition_tuple_routing )
ExecCleanupTupleRouting ( mtstate , cstate - > partition_tuple_routing ) ;
if ( proute )
ExecCleanupTupleRouting ( mtstate , proute ) ;
/* Close any trigger target relations */
ExecCleanUpTriggerState ( estate ) ;
@ -2907,6 +3086,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
MemoryContext oldcontext ;
int i ;
uint64 save_cur_lineno ;
bool line_buf_valid = cstate - > line_buf_valid ;
/*
* Print error context information correctly , if one of the operations
@ -2920,7 +3100,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
* before calling it .
*/
oldcontext = MemoryContextSwitchTo ( GetPerTupleMemoryContext ( estate ) ) ;
heap_multi_insert ( cstate - > rel ,
heap_multi_insert ( resultRelInfo - > ri_R elationDesc ,
bufferedTuples ,
nBufferedTuples ,
mycid ,
@ -2967,7 +3147,8 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
}
}
/* reset cur_lineno to where we were */
/* reset cur_lineno and line_buf_valid to what they were */
cstate - > line_buf_valid = line_buf_valid ;
cstate - > cur_lineno = save_cur_lineno ;
}