@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $ Header : / cvsroot / pgsql / src / backend / commands / copy . c , v 1.158 2002 / 06 / 20 20 : 29 : 27 momjian Exp $
* $ Header : / cvsroot / pgsql / src / backend / commands / copy . c , v 1.159 2002 / 07 / 18 04 : 43 : 50 momjian Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -28,6 +28,7 @@
# include "commands/copy.h"
# include "commands/trigger.h"
# include "executor/executor.h"
# include "rewrite/rewriteHandler.h"
# include "libpq/libpq.h"
# include "miscadmin.h"
# include "tcop/pquery.h"
@ -46,13 +47,14 @@
/* non-export function prototypes */
static void CopyTo ( Relation rel , bool binary , bool oids , FILE * fp , char * delim , char * null_print ) ;
static void CopyFrom ( Relation rel , bool binary , bool oids , FILE * fp , char * delim , char * null_print ) ;
static void CopyTo ( Relation rel , List * attlist , bool binary , bool oids , FILE * fp , char * delim , char * null_print ) ;
static void CopyFrom ( Relation rel , List * attlist , bool binary , bool oids , FILE * fp , char * delim , char * null_print ) ;
static Oid GetInputFunction ( Oid type ) ;
static Oid GetTypeElement ( Oid type ) ;
static void CopyReadNewline ( FILE * fp , int * newline ) ;
static char * CopyReadAttribute ( FILE * fp , bool * isnull , char * delim , int * newline , char * null_print ) ;
static void CopyAttributeOut ( FILE * fp , char * string , char * delim ) ;
static void CopyAssertAttlist ( Relation rel , List * attlist , bool from ) ;
static const char BinarySignature [ 12 ] = " PGBCOPY \n \377 \r \n \0 " ;
@ -267,7 +269,8 @@ DoCopy(const CopyStmt *stmt)
char * filename = stmt - > filename ;
bool is_from = stmt - > is_from ;
bool pipe = ( stmt - > filename = = NULL ) ;
List * option ;
List * option ;
List * attlist = stmt - > attlist ;
DefElem * dbinary = NULL ;
DefElem * doids = NULL ;
DefElem * ddelim = NULL ;
@ -289,25 +292,27 @@ DoCopy(const CopyStmt *stmt)
if ( strcmp ( defel - > defname , " binary " ) = = 0 )
{
if ( dbinary )
elog ( ERROR , " COPY: conflicting options " ) ;
/* should this really be an error? */
elog ( ERROR , " COPY: BINARY option appears more than once " ) ;
dbinary = defel ;
}
else if ( strcmp ( defel - > defname , " oids " ) = = 0 )
{
if ( doids )
elog ( ERROR , " COPY: conflicting options " ) ;
/* should this really be an error? */
elog ( ERROR , " COPY: OIDS option appears more than once " ) ;
doids = defel ;
}
else if ( strcmp ( defel - > defname , " delimiter " ) = = 0 )
{
if ( ddelim )
elog ( ERROR , " COPY: conflicting options " ) ;
elog ( ERROR , " COPY: DELIMITER string may only be defined once in query " ) ;
ddelim = defel ;
}
else if ( strcmp ( defel - > defname , " null " ) = = 0 )
{
if ( dnull )
elog ( ERROR , " COPY: conflicting options " ) ;
elog ( ERROR , " COPY: NULL representation may only be defined once in query " ) ;
dnull = defel ;
}
else
@ -367,6 +372,24 @@ DoCopy(const CopyStmt *stmt)
server_encoding = GetDatabaseEncoding ( ) ;
# endif
if ( attlist = = NIL ) {
/* get list of attributes in the relation */
TupleDesc desc = RelationGetDescr ( rel ) ;
int i ;
for ( i = 0 ; i < desc - > natts ; + + i ) {
Ident * id = makeNode ( Ident ) ;
id - > name = NameStr ( desc - > attrs [ i ] - > attname ) ;
attlist = lappend ( attlist , id ) ;
}
}
else {
if ( binary ) {
elog ( ERROR , " COPY: BINARY format cannot be used with specific column list " ) ;
}
/* verify that any user-specified attributes exist in the relation */
CopyAssertAttlist ( rel , attlist , is_from ) ;
}
if ( is_from )
{ /* copy from file to database */
if ( rel - > rd_rel - > relkind ! = RELKIND_RELATION )
@ -410,7 +433,7 @@ DoCopy(const CopyStmt *stmt)
elog ( ERROR , " COPY: %s is a directory. " , filename ) ;
}
}
CopyFrom ( rel , binary , oids , fp , delim , null_print ) ;
CopyFrom ( rel , attlist , binary , oids , fp , delim , null_print ) ;
}
else
{ /* copy from database to file */
@ -466,7 +489,7 @@ DoCopy(const CopyStmt *stmt)
elog ( ERROR , " COPY: %s is a directory. " , filename ) ;
}
}
CopyTo ( rel , binary , oids , fp , delim , null_print ) ;
CopyTo ( rel , attlist , binary , oids , fp , delim , null_print ) ;
}
if ( ! pipe )
@ -494,8 +517,8 @@ DoCopy(const CopyStmt *stmt)
* Copy from relation TO file .
*/
static void
CopyTo ( Relation rel , bool binary , bool oids , FILE * fp ,
char * delim , char * null_print )
CopyTo ( Relation rel , List * attlist , bool binary , bool oids ,
FILE * fp , char * delim , char * null_print )
{
HeapTuple tuple ;
TupleDesc tupDesc ;
@ -509,6 +532,10 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
int16 fld_size ;
char * string ;
Snapshot mySnapshot ;
int copy_attr_count ;
int * attmap ;
int p = 0 ;
List * cur ;
if ( oids & & ! rel - > rd_rel - > relhasoids )
elog ( ERROR , " COPY: table %s does not have OIDs " ,
@ -517,6 +544,18 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
tupDesc = rel - > rd_att ;
attr_count = rel - > rd_att - > natts ;
attr = rel - > rd_att - > attrs ;
copy_attr_count = length ( attlist ) ;
{
attmap = ( int * ) palloc ( copy_attr_count * sizeof ( int ) ) ;
foreach ( cur , attlist ) {
for ( i = 0 ; i < attr_count ; i + + ) {
if ( strcmp ( strVal ( lfirst ( cur ) ) , NameStr ( attr [ i ] - > attname ) ) = = 0 ) {
attmap [ p + + ] = i ;
continue ;
}
}
}
}
/*
* For binary copy we really only need isvarlena , but compute it
@ -593,13 +632,14 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
}
}
for ( i = 0 ; i < attr_count ; i + + )
for ( i = 0 ; i < copy_ attr_count; i + + )
{
Datum origvalue ,
value ;
bool isnull ;
int mi = attmap [ i ] ;
origvalue = heap_getattr ( tuple , i + 1 , tupDesc , & isnull ) ;
origvalue = heap_getattr ( tuple , m i + 1 , tupDesc , & isnull ) ;
if ( ! binary )
{
@ -628,25 +668,25 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
* ( or for binary case , becase we must output untoasted
* value ) .
*/
if ( isvarlena [ i ] )
if ( isvarlena [ m i] )
value = PointerGetDatum ( PG_DETOAST_DATUM ( origvalue ) ) ;
else
value = origvalue ;
if ( ! binary )
{
string = DatumGetCString ( FunctionCall3 ( & out_functions [ i ] ,
string = DatumGetCString ( FunctionCall3 ( & out_functions [ m i] ,
value ,
ObjectIdGetDatum ( elements [ i ] ) ,
Int32GetDatum ( attr [ i ] - > atttypmod ) ) ) ;
ObjectIdGetDatum ( elements [ m i] ) ,
Int32GetDatum ( attr [ m i] - > atttypmod ) ) ) ;
CopyAttributeOut ( fp , string , delim ) ;
pfree ( string ) ;
}
else
{
fld_size = attr [ i ] - > attlen ;
fld_size = attr [ m i] - > attlen ;
CopySendData ( & fld_size , sizeof ( int16 ) , fp ) ;
if ( isvarlena [ i ] )
if ( isvarlena [ m i] )
{
/* varlena */
Assert ( fld_size = = - 1 ) ;
@ -654,7 +694,7 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
VARSIZE ( value ) ,
fp ) ;
}
else if ( ! attr [ i ] - > attbyval )
else if ( ! attr [ m i] - > attbyval )
{
/* fixed-length pass-by-reference */
Assert ( fld_size > 0 ) ;
@ -709,13 +749,13 @@ CopyTo(Relation rel, bool binary, bool oids, FILE *fp,
* Copy FROM file to relation .
*/
static void
CopyFrom ( Relation rel , bool binary , bool oids , FILE * fp ,
char * delim , char * null_print )
CopyFrom ( Relation rel , List * attlist , bool binary , bool oids ,
FILE * fp , char * delim , char * null_print )
{
HeapTuple tuple ;
TupleDesc tupDesc ;
Form_pg_attribute * attr ;
AttrNumber attr_count ;
AttrNumber attr_count , copy_attr_count , def_attr_count ;
FmgrInfo * in_functions ;
Oid * elements ;
int i ;
@ -732,10 +772,17 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
Oid loaded_oid = InvalidOid ;
bool skip_tuple = false ;
bool file_has_oids ;
int * attmap = NULL ;
int * defmap = NULL ;
Node * * defexprs = NULL ; /* array of default att expressions */
ExprContext * econtext ; /* used for ExecEvalExpr for default atts */
ExprDoneCond isdone ;
tupDesc = RelationGetDescr ( rel ) ;
attr = tupDesc - > attrs ;
attr_count = tupDesc - > natts ;
copy_attr_count = length ( attlist ) ;
def_attr_count = 0 ;
/*
* We need a ResultRelInfo so we can use the regular executor ' s
@ -758,15 +805,42 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
slot = ExecAllocTableSlot ( tupleTable ) ;
ExecSetSlotDescriptor ( slot , tupDesc , false ) ;
if ( ! binary )
{
/*
* pick up the input function and default expression ( if any ) for
* each attribute in the relation .
*/
List * cur ;
attmap = ( int * ) palloc ( sizeof ( int ) * attr_count ) ;
defmap = ( int * ) palloc ( sizeof ( int ) * attr_count ) ;
defexprs = ( Node * * ) palloc ( sizeof ( Node * ) * attr_count ) ;
in_functions = ( FmgrInfo * ) palloc ( attr_count * sizeof ( FmgrInfo ) ) ;
elements = ( Oid * ) palloc ( attr_count * sizeof ( Oid ) ) ;
for ( i = 0 ; i < attr_count ; i + + )
{
int p = 0 ;
bool specified = false ;
in_func_oid = ( Oid ) GetInputFunction ( attr [ i ] - > atttypid ) ;
fmgr_info ( in_func_oid , & in_functions [ i ] ) ;
elements [ i ] = GetTypeElement ( attr [ i ] - > atttypid ) ;
foreach ( cur , attlist ) {
if ( strcmp ( strVal ( lfirst ( cur ) ) , NameStr ( attr [ i ] - > attname ) ) = = 0 ) {
attmap [ p ] = i ;
specified = true ;
continue ;
}
+ + p ;
}
if ( ! specified ) {
/* column not specified, try to get a default */
defexprs [ def_attr_count ] = build_column_default ( rel , i + 1 ) ;
if ( defexprs [ def_attr_count ] ! = NULL ) {
defmap [ def_attr_count ] = i ;
+ + def_attr_count ;
}
}
}
file_has_oids = oids ; /* must rely on user to tell us this... */
}
@ -821,12 +895,14 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
copy_lineno = 0 ;
fe_eof = false ;
econtext = GetPerTupleExprContext ( estate ) ;
while ( ! done )
{
CHECK_FOR_INTERRUPTS ( ) ;
copy_lineno + + ;
/* Reset the per-output-tuple exprcontext */
ResetPerTupleExprContext ( estate ) ;
@ -854,26 +930,42 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
elog ( ERROR , " COPY TEXT: Invalid Oid " ) ;
}
}
for ( i = 0 ; i < attr_count & & ! done ; i + + )
/*
* here , we only try to read as many attributes as
* were specified .
*/
for ( i = 0 ; i < copy_attr_count & & ! done ; i + + )
{
int m = attmap [ i ] ;
string = CopyReadAttribute ( fp , & isnull , delim ,
& newline , null_print ) ;
if ( isnull )
{
/* already set values[i] and nulls[i] */
if ( isnull ) {
/* nothing */
}
else if ( string = = NULL )
done = 1 ; /* end of file */
else
{
values [ i ] = FunctionCall3 ( & in_functions [ i ] ,
CStringGetDatum ( string ) ,
ObjectIdGetDatum ( elements [ i ] ) ,
Int32GetDatum ( attr [ i ] - > atttypmod ) ) ;
nulls [ i ] = ' ' ;
values [ m ] = FunctionCall3 ( & in_functions [ m ] ,
CStringGetDatum ( string ) ,
ObjectIdGetDatum ( elements [ m ] ) ,
Int32GetDatum ( attr [ m ] - > atttypmod ) ) ;
nulls [ m ] = ' ' ;
}
}
/*
* as above , we only try a default lookup if one is
* known to be available
*/
for ( i = 0 ; i < def_attr_count & & ! done ; i + + ) {
bool isnull ;
values [ defmap [ i ] ] = ExecEvalExpr ( defexprs [ i ] , econtext , & isnull , & isdone ) ;
if ( ! isnull )
nulls [ defmap [ i ] ] = ' ' ;
}
if ( ! done )
CopyReadNewline ( fp , & newline ) ;
}
@ -975,7 +1067,7 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
break ;
tuple = heap_formtuple ( tupDesc , values , nulls ) ;
if ( oids & & file_has_oids )
tuple - > t_data - > t_oid = loaded_oid ;
@ -1021,12 +1113,6 @@ CopyFrom(Relation rel, bool binary, bool oids, FILE *fp,
ExecARInsertTriggers ( estate , resultRelInfo , tuple ) ;
}
for ( i = 0 ; i < attr_count ; i + + )
{
if ( ! attr [ i ] - > attbyval & & nulls [ i ] ! = ' n ' )
pfree ( DatumGetPointer ( values [ i ] ) ) ;
}
heap_freetuple ( tuple ) ;
}
@ -1361,3 +1447,51 @@ CopyAttributeOut(FILE *fp, char *server_string, char *delim)
pfree ( string_start ) ; /* pfree pg_server_to_client result */
# endif
}
/*
* CopyAssertAttlist : elog ( ERROR , . . . ) if the specified attlist
* is not valid for the Relation
*/
static void
CopyAssertAttlist ( Relation rel , List * attlist , bool from )
{
TupleDesc tupDesc ;
List * cur ;
char * illegalattname = NULL ;
int attr_count ;
const char * to_or_from ;
if ( attlist = = NIL )
return ;
to_or_from = ( from = = true ? " FROM " : " TO " ) ;
tupDesc = RelationGetDescr ( rel ) ;
Assert ( tupDesc ! = NULL ) ;
/*
* make sure there aren ' t more columns specified than are in the table
*/
attr_count = tupDesc - > natts ;
if ( attr_count < length ( attlist ) )
elog ( ERROR , " More columns specified in COPY %s command than in target relation " , to_or_from ) ;
/*
* make sure no columns are specified that don ' t exist in the table
*/
foreach ( cur , attlist )
{
int found = 0 ;
int i = 0 ;
for ( ; i < attr_count ; + + i )
{
if ( strcmp ( strVal ( lfirst ( cur ) ) , NameStr ( tupDesc - > attrs [ i ] - > attname ) ) = = 0 )
+ + found ;
}
if ( ! found )
illegalattname = strVal ( lfirst ( cur ) ) ;
}
if ( illegalattname )
elog ( ERROR , " Attribute referenced in COPY %s command does not exist: \" %s.%s \" " , to_or_from , RelationGetRelationName ( rel ) , illegalattname ) ;
}