@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
BulkInsertState bistate = NULL ;
CopyInsertMethod insertMethod ;
CopyMultiInsertInfo multiInsertInfo = { 0 } ; /* pacify compiler */
uint64 processed = 0 ;
int64 processed = 0 ;
int64 excluded = 0 ;
bool has_before_insert_row_trig ;
bool has_instead_insert_row_trig ;
bool leafpart_use_multi_insert = false ;
@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
econtext - > ecxt_scantuple = myslot ;
/* Skip items that don't match COPY's WHERE clause */
if ( ! ExecQual ( cstate - > qualexpr , econtext ) )
{
/*
* Report that this tuple was filtered out by the WHERE
* clause .
*/
pgstat_progress_update_param ( PROGRESS_COPY_TUPLES_EXCLUDED ,
+ + excluded ) ;
continue ;
}
}
/* Determine the partition to insert the tuple into */
@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW ; this is the same definition used by nodeModifyTable . c
* for counting tuples inserted by an INSERT command . Update
* for counting tuples inserted by an INSERT command . Update
* progress of the COPY command as well .
*/
pgstat_progress_update_param ( PROGRESS_COPY_LINES_PROCESSED , + + processed ) ;
pgstat_progress_update_param ( PROGRESS_COPY_TUPLES_PROCESSED ,
+ + processed ) ;
}
}
@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
ExprState * * defexprs ;
MemoryContext oldcontext ;
bool volatile_defexprs ;
const int progress_cols [ ] = {
PROGRESS_COPY_COMMAND ,
PROGRESS_COPY_TYPE ,
PROGRESS_COPY_BYTES_TOTAL
} ;
int64 progress_vals [ ] = {
PROGRESS_COPY_COMMAND_FROM ,
0 ,
0
} ;
/* Allocate workspace and zero all fields */
cstate = ( CopyFromStateData * ) palloc0 ( sizeof ( CopyFromStateData ) ) ;
@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,
if ( data_source_cb )
{
progress_vals [ 1 ] = PROGRESS_COPY_TYPE_CALLBACK ;
cstate - > copy_src = COPY_CALLBACK ;
cstate - > data_source_cb = data_source_cb ;
}
else if ( pipe )
{
progress_vals [ 1 ] = PROGRESS_COPY_TYPE_PIPE ;
Assert ( ! is_program ) ; /* the grammar does not allow this */
if ( whereToSendOutput = = DestRemote )
ReceiveCopyBegin ( cstate ) ;
@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,
if ( cstate - > is_program )
{
progress_vals [ 1 ] = PROGRESS_COPY_TYPE_PROGRAM ;
cstate - > copy_file = OpenPipeStream ( cstate - > filename , PG_BINARY_R ) ;
if ( cstate - > copy_file = = NULL )
ereport ( ERROR ,
@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
{
struct stat st ;
progress_vals [ 1 ] = PROGRESS_COPY_TYPE_FILE ;
cstate - > copy_file = AllocateFile ( cstate - > filename , PG_BINARY_R ) ;
if ( cstate - > copy_file = = NULL )
{
@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
( errcode ( ERRCODE_WRONG_OBJECT_TYPE ) ,
errmsg ( " \" %s \" is a directory " , cstate - > filename ) ) ) ;
pgstat_progress_update_param ( PROGRESS_COPY_BYTES_TOTAL , st . st_size ) ;
progress_vals [ 2 ] = st . st_size ;
}
}
pgstat_progress_update_multi_param ( 3 , progress_cols , progress_vals ) ;
if ( cstate - > opts . binary )
{
/* Read and verify binary header */