@ -61,11 +61,28 @@
# define thandle HANDLE
# endif
typedef struct ParallelStateEntry
{
# ifdef WIN32
unsigned int threadId ;
# else
pid_t pid ;
# endif
ArchiveHandle * AH ;
} ParallelStateEntry ;
typedef struct ParallelState
{
int numWorkers ;
ParallelStateEntry * pse ;
} ParallelState ;
/* Arguments needed for a worker child */
typedef struct _restore_args
{
ArchiveHandle * AH ;
TocEntry * te ;
ParallelStateEntry * pse ;
} RestoreArgs ;
/* State for each parallel activity slot */
@ -75,6 +92,14 @@ typedef struct _parallel_slot
RestoreArgs * args ;
} ParallelSlot ;
typedef struct ShutdownInformation
{
ParallelState * pstate ;
Archive * AHX ;
} ShutdownInformation ;
static ShutdownInformation shutdown_info ;
# define NO_SLOT (-1)
# define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
@ -122,10 +147,6 @@ static int _discoverArchiveFormat(ArchiveHandle *AH);
static int RestoringToDB ( ArchiveHandle * AH ) ;
static void dump_lo_buf ( ArchiveHandle * AH ) ;
static void vdie_horribly ( ArchiveHandle * AH , const char * modulename ,
const char * fmt , va_list ap )
__attribute__ ( ( format ( PG_PRINTF_ATTRIBUTE , 3 , 0 ) , noreturn ) ) ;
static void dumpTimestamp ( ArchiveHandle * AH , const char * msg , time_t tim ) ;
static void SetOutput ( ArchiveHandle * AH , const char * filename , int compression ) ;
static OutputContext SaveOutput ( ArchiveHandle * AH ) ;
@ -160,6 +181,11 @@ static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
static ArchiveHandle * CloneArchive ( ArchiveHandle * AH ) ;
static void DeCloneArchive ( ArchiveHandle * AH ) ;
static void setProcessIdentifier ( ParallelStateEntry * pse , ArchiveHandle * AH ) ;
static void unsetProcessIdentifier ( ParallelStateEntry * pse ) ;
static ParallelStateEntry * GetMyPSEntry ( ParallelState * pstate ) ;
static void archive_close_connection ( int code , void * arg ) ;
/*
* Wrapper functions .
@ -208,8 +234,8 @@ CloseArchive(Archive *AHX)
res = fclose ( AH - > OF ) ;
if ( res ! = 0 )
die_horribly ( AH , modulename , " could not close output file: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename , " could not close output file: %s \n " ,
strerror ( errno ) ) ;
}
/* Public */
@ -234,14 +260,14 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
* connected to , not the one we will create , which is very bad . . .
*/
if ( ropt - > createDB & & ropt - > dropSchema )
die_horribly ( AH , modulename , " -C and -c are incompatible options \n " ) ;
exit_horribly ( modulename , " -C and -c are incompatible options \n " ) ;
/*
* - C is not compatible with - 1 , because we can ' t create a database inside
* a transaction block .
*/
if ( ropt - > createDB & & ropt - > single_txn )
die_horribly ( AH , modulename , " -C and -1 are incompatible options \n " ) ;
exit_horribly ( modulename , " -C and -1 are incompatible options \n " ) ;
/*
* If we ' re going to do parallel restore , there are some restrictions .
@ -251,11 +277,11 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
/* We haven't got round to making this work for all archive formats */
if ( AH - > ClonePtr = = NULL | | AH - > ReopenPtr = = NULL )
die_horribly ( AH , modulename , " parallel restore is not supported with this archive file format \n " ) ;
exit_horribly ( modulename , " parallel restore is not supported with this archive file format \n " ) ;
/* Doesn't work if the archive represents dependencies as OIDs */
if ( AH - > version < K_VERS_1_8 )
die_horribly ( AH , modulename , " parallel restore is not supported with archives made by pre-8.0 pg_dump \n " ) ;
exit_horribly ( modulename , " parallel restore is not supported with archives made by pre-8.0 pg_dump \n " ) ;
/*
* It ' s also not gonna work if we can ' t reopen the input file , so
@ -274,7 +300,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
reqs = _tocEntryRequired ( te , ropt , false ) ;
if ( te - > hadDumper & & ( reqs & REQ_DATA ) ! = 0 )
die_horribly ( AH , modulename , " cannot restore from compressed archive (compression not supported in this installation) \n " ) ;
exit_horribly ( modulename , " cannot restore from compressed archive (compression not supported in this installation) \n " ) ;
}
}
# endif
@ -286,7 +312,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ahlog ( AH , 1 , " connecting to database for restore \n " ) ;
if ( AH - > version < K_VERS_1_3 )
die_horribly ( AH , modulename , " direct database connections are not supported in pre-1.3 archives \n " ) ;
exit_horribly ( modulename , " direct database connections are not supported in pre-1.3 archives \n " ) ;
/* XXX Should get this from the archive */
AHX - > minRemoteVersion = 070100 ;
@ -734,7 +760,7 @@ WriteData(Archive *AHX, const void *data, size_t dLen)
ArchiveHandle * AH = ( ArchiveHandle * ) AHX ;
if ( ! AH - > currToc )
die_horribly ( AH , modulename , " internal error -- WriteData cannot be called outside the context of a DataDumper routine \n " ) ;
exit_horribly ( modulename , " internal error -- WriteData cannot be called outside the context of a DataDumper routine \n " ) ;
return ( * AH - > WriteDataPtr ) ( AH , data , dLen ) ;
}
@ -886,7 +912,7 @@ StartBlob(Archive *AHX, Oid oid)
ArchiveHandle * AH = ( ArchiveHandle * ) AHX ;
if ( ! AH - > StartBlobPtr )
die_horribly ( AH , modulename , " large-object output not supported in chosen format \n " ) ;
exit_horribly ( modulename , " large-object output not supported in chosen format \n " ) ;
( * AH - > StartBlobPtr ) ( AH , AH - > currToc , oid ) ;
@ -973,13 +999,13 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
loOid = lo_create ( AH - > connection , oid ) ;
if ( loOid = = 0 | | loOid ! = oid )
die_horribly ( AH , modulename , " could not create large object %u: %s " ,
oid , PQerrorMessage ( AH - > connection ) ) ;
exit_horribly ( modulename , " could not create large object %u: %s " ,
oid , PQerrorMessage ( AH - > connection ) ) ;
}
AH - > loFd = lo_open ( AH - > connection , oid , INV_WRITE ) ;
if ( AH - > loFd = = - 1 )
die_horribly ( AH , modulename , " could not open large object %u: %s " ,
oid , PQerrorMessage ( AH - > connection ) ) ;
exit_horribly ( modulename , " could not open large object %u: %s " ,
oid , PQerrorMessage ( AH - > connection ) ) ;
}
else
{
@ -1035,8 +1061,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
/* Setup the file */
fh = fopen ( ropt - > tocFile , PG_BINARY_R ) ;
if ( ! fh )
die_horribly ( AH , modulename , " could not open TOC file \" %s \" : %s \n " ,
ropt - > tocFile , strerror ( errno ) ) ;
exit_horribly ( modulename , " could not open TOC file \" %s \" : %s \n " ,
ropt - > tocFile , strerror ( errno ) ) ;
incomplete_line = false ;
while ( fgets ( buf , sizeof ( buf ) , fh ) ! = NULL )
@ -1083,8 +1109,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
/* Find TOC entry */
te = getTocEntryByDumpId ( AH , id ) ;
if ( ! te )
die_horribly ( AH , modulename , " could not find entry for ID %d \n " ,
id ) ;
exit_horribly ( modulename , " could not find entry for ID %d \n " ,
id ) ;
/* Mark it wanted */
ropt - > idWanted [ id - 1 ] = true ;
@ -1104,8 +1130,8 @@ SortTocFromFile(Archive *AHX, RestoreOptions *ropt)
}
if ( fclose ( fh ) ! = 0 )
die_horribly ( AH , modulename , " could not close TOC file: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename , " could not close TOC file: %s \n " ,
strerror ( errno ) ) ;
}
/*
@ -1221,11 +1247,11 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression)
if ( ! AH - > OF )
{
if ( filename )
die_horribly ( AH , modulename , " could not open output file \" %s \" : %s \n " ,
filename , strerror ( errno ) ) ;
exit_horribly ( modulename , " could not open output file \" %s \" : %s \n " ,
filename , strerror ( errno ) ) ;
else
die_horribly ( AH , modulename , " could not open output file: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename , " could not open output file: %s \n " ,
strerror ( errno ) ) ;
}
}
@ -1251,7 +1277,7 @@ RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
res = fclose ( AH - > OF ) ;
if ( res ! = 0 )
die_horribly ( AH , modulename , " could not close output file: %s \n " ,
exit_horribly ( modulename , " could not close output file: %s \n " ,
strerror ( errno ) ) ;
AH - > gzOut = savedContext . gzOut ;
@ -1329,7 +1355,7 @@ dump_lo_buf(ArchiveHandle *AH)
AH - > lo_buf_used ) ,
( unsigned long ) AH - > lo_buf_used , ( unsigned long ) res ) ;
if ( res ! = AH - > lo_buf_used )
die_horribly ( AH , modulename ,
exit_horribly ( modulename ,
" could not write to large object (result: %lu, expected: %lu) \n " ,
( unsigned long ) res , ( unsigned long ) AH - > lo_buf_used ) ;
}
@ -1388,7 +1414,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
{
res = GZWRITE ( ptr , size , nmemb , AH - > OF ) ;
if ( res ! = ( nmemb * size ) )
die_horribly ( AH , modulename , " could not write to output file: %s \n " , strerror ( errno ) ) ;
exit_horribly ( modulename , " could not write to output file: %s \n " , strerror ( errno ) ) ;
return res ;
}
else if ( AH - > CustomOutPtr )
@ -1396,7 +1422,7 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
res = AH - > CustomOutPtr ( AH , ptr , size * nmemb ) ;
if ( res ! = ( nmemb * size ) )
die_horribly ( AH , modulename , " could not write to custom output routine \n " ) ;
exit_horribly ( modulename , " could not write to custom output routine \n " ) ;
return res ;
}
else
@ -1411,56 +1437,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
{
res = fwrite ( ptr , size , nmemb , AH - > OF ) ;
if ( res ! = nmemb )
die_horribly ( AH , modulename , " could not write to output file: %s \n " ,
exit_horribly ( modulename , " could not write to output file: %s \n " ,
strerror ( errno ) ) ;
return res ;
}
}
}
/* Report a fatal error and exit(1) */
static void
vdie_horribly ( ArchiveHandle * AH , const char * modulename ,
const char * fmt , va_list ap )
{
vwrite_msg ( modulename , fmt , ap ) ;
if ( AH )
{
if ( AH - > public . verbose )
write_msg ( NULL , " *** aborted because of error \n " ) ;
DisconnectDatabase ( & AH - > public ) ;
}
exit_nicely ( 1 ) ;
}
/* As above, but with variable arg list */
void
die_horribly ( ArchiveHandle * AH , const char * modulename , const char * fmt , . . . )
{
va_list ap ;
va_start ( ap , fmt ) ;
vdie_horribly ( AH , modulename , fmt , ap ) ;
va_end ( ap ) ;
}
/* As above, but with a complaint about a particular query. */
void
die_on_query_failure ( ArchiveHandle * AH , const char * modulename ,
const char * query )
{
write_msg ( modulename , " query failed: %s " ,
PQerrorMessage ( AH - > connection ) ) ;
die_horribly ( AH , modulename , " query was: %s \n " , query ) ;
}
/* on some error, we may decide to go on... */
void
warn_or_die _horribly ( ArchiveHandle * AH ,
const char * modulename , const char * fmt , . . . )
warn_or_exit_horribly ( ArchiveHandle * AH ,
const char * modulename , const char * fmt , . . . )
{
va_list ap ;
@ -1497,14 +1484,13 @@ warn_or_die_horribly(ArchiveHandle *AH,
AH - > lastErrorTE = AH - > currentTE ;
va_start ( ap , fmt ) ;
vwrite_msg ( modulename , fmt , ap ) ;
va_end ( ap ) ;
if ( AH - > public . exit_on_error )
vdie_horribly ( AH , modulename , fmt , ap ) ;
exit_nicely ( 1 ) ;
else
{
vwrite_msg ( modulename , fmt , ap ) ;
AH - > public . n_errors + + ;
}
va_end ( ap ) ;
}
# ifdef NOT_USED
@ -1623,7 +1609,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o)
break ;
default :
die_horribly ( AH , modulename , " unexpected data offset flag %d \n " , offsetFlg ) ;
exit_horribly ( modulename , " unexpected data offset flag %d \n " , offsetFlg ) ;
}
/*
@ -1636,7 +1622,7 @@ ReadOffset(ArchiveHandle *AH, pgoff_t * o)
else
{
if ( ( * AH - > ReadBytePtr ) ( AH ) ! = 0 )
die_horribly ( AH , modulename , " file offset in dump file is too large \n " ) ;
exit_horribly ( modulename , " file offset in dump file is too large \n " ) ;
}
}
@ -1730,7 +1716,7 @@ ReadStr(ArchiveHandle *AH)
{
buf = ( char * ) pg_malloc ( l + 1 ) ;
if ( ( * AH - > ReadBufPtr ) ( AH , ( void * ) buf , l ) ! = l )
die_horribly ( AH , modulename , " unexpected end of file \n " ) ;
exit_horribly ( modulename , " unexpected end of file \n " ) ;
buf [ l ] = ' \0 ' ;
}
@ -1773,8 +1759,8 @@ _discoverArchiveFormat(ArchiveHandle *AH)
char buf [ MAXPGPATH ] ;
if ( snprintf ( buf , MAXPGPATH , " %s/toc.dat " , AH - > fSpec ) > = MAXPGPATH )
die_horribly ( AH , modulename , " directory name too long: \" %s \" \n " ,
AH - > fSpec ) ;
exit_horribly ( modulename , " directory name too long: \" %s \" \n " ,
AH - > fSpec ) ;
if ( stat ( buf , & st ) = = 0 & & S_ISREG ( st . st_mode ) )
{
AH - > format = archDirectory ;
@ -1783,32 +1769,32 @@ _discoverArchiveFormat(ArchiveHandle *AH)
# ifdef HAVE_LIBZ
if ( snprintf ( buf , MAXPGPATH , " %s/toc.dat.gz " , AH - > fSpec ) > = MAXPGPATH )
die_horribly ( AH , modulename , " directory name too long: \" %s \" \n " ,
AH - > fSpec ) ;
exit_horribly ( modulename , " directory name too long: \" %s \" \n " ,
AH - > fSpec ) ;
if ( stat ( buf , & st ) = = 0 & & S_ISREG ( st . st_mode ) )
{
AH - > format = archDirectory ;
return AH - > format ;
}
# endif
die_horribly ( AH , modulename , " directory \" %s \" does not appear to be a valid archive ( \" toc.dat \" does not exist) \n " ,
AH - > fSpec ) ;
exit_horribly ( modulename , " directory \" %s \" does not appear to be a valid archive ( \" toc.dat \" does not exist) \n " ,
AH - > fSpec ) ;
fh = NULL ; /* keep compiler quiet */
}
else
{
fh = fopen ( AH - > fSpec , PG_BINARY_R ) ;
if ( ! fh )
die_horribly ( AH , modulename , " could not open input file \" %s \" : %s \n " ,
AH - > fSpec , strerror ( errno ) ) ;
exit_horribly ( modulename , " could not open input file \" %s \" : %s \n " ,
AH - > fSpec , strerror ( errno ) ) ;
}
}
else
{
fh = stdin ;
if ( ! fh )
die_horribly ( AH , modulename , " could not open input file: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename , " could not open input file: %s \n " ,
strerror ( errno ) ) ;
}
cnt = fread ( sig , 1 , 5 , fh ) ;
@ -1816,10 +1802,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
if ( cnt ! = 5 )
{
if ( ferror ( fh ) )
die_horribly ( AH , modulename , " could not read input file: %s \n " , strerror ( errno ) ) ;
exit_horribly ( modulename , " could not read input file: %s \n " , strerror ( errno ) ) ;
else
die_horribly ( AH , modulename , " input file is too short (read %lu, expected 5) \n " ,
( unsigned long ) cnt ) ;
exit_horribly ( modulename , " input file is too short (read %lu, expected 5) \n " ,
( unsigned long ) cnt ) ;
}
/* Save it, just in case we need it later */
@ -1880,14 +1866,14 @@ _discoverArchiveFormat(ArchiveHandle *AH)
strncmp ( AH - > lookahead , TEXT_DUMPALL_HEADER , strlen ( TEXT_DUMPALL_HEADER ) ) = = 0 ) )
{
/* looks like it's probably a text format dump. so suggest they try psql */
die_horribly ( AH , modulename , " input file appears to be a text format dump. Please use psql. \n " ) ;
exit_horribly ( modulename , " input file appears to be a text format dump. Please use psql. \n " ) ;
}
if ( AH - > lookaheadLen ! = 512 )
die_horribly ( AH , modulename , " input file does not appear to be a valid archive (too short?) \n " ) ;
exit_horribly ( modulename , " input file does not appear to be a valid archive (too short?) \n " ) ;
if ( ! isValidTarHeader ( AH - > lookahead ) )
die_horribly ( AH , modulename , " input file does not appear to be a valid archive \n " ) ;
exit_horribly ( modulename , " input file does not appear to be a valid archive \n " ) ;
AH - > format = archTar ;
}
@ -1907,8 +1893,8 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/* Close the file */
if ( wantClose )
if ( fclose ( fh ) ! = 0 )
die_horribly ( AH , modulename , " could not close input file: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename , " could not close input file: %s \n " ,
strerror ( errno ) ) ;
return AH - > format ;
}
@ -2027,7 +2013,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
break ;
default :
die_horribly ( AH , modulename , " unrecognized file format \" %d \" \n " , fmt ) ;
exit_horribly ( modulename , " unrecognized file format \" %d \" \n " , fmt ) ;
}
return AH ;
@ -2149,9 +2135,9 @@ ReadToc(ArchiveHandle *AH)
/* Sanity check */
if ( te - > dumpId < = 0 )
die_horribly ( AH , modulename ,
" entry ID %d out of range -- perhaps a corrupt TOC \n " ,
te - > dumpId ) ;
exit_horribly ( modulename ,
" entry ID %d out of range -- perhaps a corrupt TOC \n " ,
te - > dumpId ) ;
te - > hadDumper = ReadInt ( AH ) ;
@ -2306,13 +2292,13 @@ processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
* ptr2 = ' \0 ' ;
encoding = pg_char_to_encoding ( ptr1 ) ;
if ( encoding < 0 )
die_horribly ( AH , modulename , " unrecognized encoding \" %s \" \n " ,
ptr1 ) ;
exit_horribly ( modulename , " unrecognized encoding \" %s \" \n " ,
ptr1 ) ;
AH - > public . encoding = encoding ;
}
else
die_horribly ( AH , modulename , " invalid ENCODING item: %s \n " ,
te - > defn ) ;
exit_horribly ( modulename , " invalid ENCODING item: %s \n " ,
te - > defn ) ;
free ( defn ) ;
}
@ -2329,8 +2315,8 @@ processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
else if ( ptr1 & & strncmp ( ptr1 , " 'off' " , 5 ) = = 0 )
AH - > public . std_strings = false ;
else
die_horribly ( AH , modulename , " invalid STDSTRINGS item: %s \n " ,
te - > defn ) ;
exit_horribly ( modulename , " invalid STDSTRINGS item: %s \n " ,
te - > defn ) ;
}
static teReqs
@ -2537,9 +2523,9 @@ _doSetSessionAuth(ArchiveHandle *AH, const char *user)
res = PQexec ( AH - > connection , cmd - > data ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
/* NOT warn_or_die _horribly... use -O instead to skip this. */
die_horribly ( AH , modulename , " could not set session user to \" %s \" : %s " ,
user , PQerrorMessage ( AH - > connection ) ) ;
/* NOT warn_or_exit _horribly... use -O instead to skip this. */
exit_horribly ( modulename , " could not set session user to \" %s \" : %s " ,
user , PQerrorMessage ( AH - > connection ) ) ;
PQclear ( res ) ;
}
@ -2569,9 +2555,9 @@ _doSetWithOids(ArchiveHandle *AH, const bool withOids)
res = PQexec ( AH - > connection , cmd - > data ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
warn_or_die _horribly ( AH , modulename ,
" could not set default_with_oids: %s " ,
PQerrorMessage ( AH - > connection ) ) ;
warn_or_exit _horribly ( AH , modulename ,
" could not set default_with_oids: %s " ,
PQerrorMessage ( AH - > connection ) ) ;
PQclear ( res ) ;
}
@ -2707,9 +2693,9 @@ _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
res = PQexec ( AH - > connection , qry - > data ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
warn_or_die _horribly ( AH , modulename ,
" could not set search_path to \" %s \" : %s " ,
schemaName , PQerrorMessage ( AH - > connection ) ) ;
warn_or_exit _horribly ( AH , modulename ,
" could not set search_path to \" %s \" : %s " ,
schemaName , PQerrorMessage ( AH - > connection ) ) ;
PQclear ( res ) ;
}
@ -2768,9 +2754,9 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace)
res = PQexec ( AH - > connection , qry - > data ) ;
if ( ! res | | PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
warn_or_die _horribly ( AH , modulename ,
" could not set default_tablespace to %s: %s " ,
fmtId ( want ) , PQerrorMessage ( AH - > connection ) ) ;
warn_or_exit _horribly ( AH , modulename ,
" could not set default_tablespace to %s: %s " ,
fmtId ( want ) , PQerrorMessage ( AH - > connection ) ) ;
PQclear ( res ) ;
}
@ -3150,10 +3136,10 @@ ReadHead(ArchiveHandle *AH)
if ( ! AH - > readHeader )
{
if ( ( * AH - > ReadBufPtr ) ( AH , tmpMag , 5 ) ! = 5 )
die_horribly ( AH , modulename , " unexpected end of file \n " ) ;
exit_horribly ( modulename , " unexpected end of file \n " ) ;
if ( strncmp ( tmpMag , " PGDMP " , 5 ) ! = 0 )
die_horribly ( AH , modulename , " did not find magic string in file header \n " ) ;
exit_horribly ( modulename , " did not find magic string in file header \n " ) ;
AH - > vmaj = ( * AH - > ReadBytePtr ) ( AH ) ;
AH - > vmin = ( * AH - > ReadBytePtr ) ( AH ) ;
@ -3166,13 +3152,13 @@ ReadHead(ArchiveHandle *AH)
AH - > version = ( ( AH - > vmaj * 256 + AH - > vmin ) * 256 + AH - > vrev ) * 256 + 0 ;
if ( AH - > version < K_VERS_1_0 | | AH - > version > K_VERS_MAX )
die_horribly ( AH , modulename , " unsupported version (%d.%d) in file header \n " ,
AH - > vmaj , AH - > vmin ) ;
exit_horribly ( modulename , " unsupported version (%d.%d) in file header \n " ,
AH - > vmaj , AH - > vmin ) ;
AH - > intSize = ( * AH - > ReadBytePtr ) ( AH ) ;
if ( AH - > intSize > 32 )
die_horribly ( AH , modulename , " sanity check on integer size (%lu) failed \n " ,
( unsigned long ) AH - > intSize ) ;
exit_horribly ( modulename , " sanity check on integer size (%lu) failed \n " ,
( unsigned long ) AH - > intSize ) ;
if ( AH - > intSize > sizeof ( int ) )
write_msg ( modulename , " WARNING: archive was made on a machine with larger integers, some operations might fail \n " ) ;
@ -3185,8 +3171,8 @@ ReadHead(ArchiveHandle *AH)
fmt = ( * AH - > ReadBytePtr ) ( AH ) ;
if ( AH - > format ! = fmt )
die_horribly ( AH , modulename , " expected format (%d) differs from format found in file (%d) \n " ,
AH - > format , fmt ) ;
exit_horribly ( modulename , " expected format (%d) differs from format found in file (%d) \n " ,
AH - > format , fmt ) ;
}
if ( AH - > version > = K_VERS_1_2 )
@ -3290,6 +3276,66 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
ahprintf ( AH , " -- %s %s \n \n " , msg , buf ) ;
}
static void
setProcessIdentifier ( ParallelStateEntry * pse , ArchiveHandle * AH )
{
# ifdef WIN32
pse - > threadId = GetCurrentThreadId ( ) ;
# else
pse - > pid = getpid ( ) ;
# endif
pse - > AH = AH ;
}
static void
unsetProcessIdentifier ( ParallelStateEntry * pse )
{
# ifdef WIN32
pse - > threadId = 0 ;
# else
pse - > pid = 0 ;
# endif
pse - > AH = NULL ;
}
static ParallelStateEntry *
GetMyPSEntry ( ParallelState * pstate )
{
int i ;
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
# ifdef WIN32
if ( pstate - > pse [ i ] . threadId = = GetCurrentThreadId ( ) )
# else
if ( pstate - > pse [ i ] . pid = = getpid ( ) )
# endif
return & ( pstate - > pse [ i ] ) ;
return NULL ;
}
static void
archive_close_connection ( int code , void * arg )
{
ShutdownInformation * si = ( ShutdownInformation * ) arg ;
if ( si - > pstate )
{
ParallelStateEntry * entry = GetMyPSEntry ( si - > pstate ) ;
if ( entry ! = NULL & & entry - > AH )
DisconnectDatabase ( & ( entry - > AH - > public ) ) ;
}
else if ( si - > AHX )
DisconnectDatabase ( si - > AHX ) ;
}
void
on_exit_close_archive ( Archive * AHX )
{
shutdown_info . AHX = AHX ;
on_exit_nicely ( archive_close_connection , & shutdown_info ) ;
}
/*
* Main engine for parallel restore .
@ -3316,10 +3362,17 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
TocEntry * next_work_item ;
thandle ret_child ;
TocEntry * te ;
ParallelState * pstate ;
int i ;
ahlog ( AH , 2 , " entering restore_toc_entries_parallel \n " ) ;
slots = ( ParallelSlot * ) pg_calloc ( sizeof ( ParallelSlot ) , n_slots ) ;
slots = ( ParallelSlot * ) pg_calloc ( n_slots , sizeof ( ParallelSlot ) ) ;
pstate = ( ParallelState * ) pg_malloc ( sizeof ( ParallelState ) ) ;
pstate - > pse = ( ParallelStateEntry * ) pg_calloc ( n_slots , sizeof ( ParallelStateEntry ) ) ;
pstate - > numWorkers = ropt - > number_of_jobs ;
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
unsetProcessIdentifier ( & ( pstate - > pse [ i ] ) ) ;
/* Adjust dependency information */
fix_dependencies ( AH ) ;
@ -3375,6 +3428,12 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
*/
DisconnectDatabase ( & AH - > public ) ;
/*
* Set the pstate in the shutdown_info . The exit handler uses pstate if set
* and falls back to AHX otherwise .
*/
shutdown_info . pstate = pstate ;
/* blow away any transient state from the old connection */
if ( AH - > currUser )
free ( AH - > currUser ) ;
@ -3473,6 +3532,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
args = pg_malloc ( sizeof ( RestoreArgs ) ) ;
args - > AH = CloneArchive ( AH ) ;
args - > te = next_work_item ;
args - > pse = & pstate - > pse [ next_slot ] ;
/* run the step in a worker child */
child = spawn_restore ( args ) ;
@ -3500,13 +3560,19 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
}
else
{
die_horribly ( AH , modulename , " worker process crashed: status %d \n " ,
work_status ) ;
exit_horribly ( modulename , " worker process crashed: status %d \n " ,
work_status ) ;
}
}
ahlog ( AH , 1 , " finished main parallel loop \n " ) ;
/*
* Remove the pstate again , so the exit handler will now fall back to
* closing AH - > connection again .
*/
shutdown_info . pstate = NULL ;
/*
* Now reconnect the single parent connection .
*/
@ -3548,23 +3614,23 @@ spawn_restore(RestoreArgs *args)
{
/* in child process */
parallel_restore ( args ) ;
die_horribly ( args - > AH , modulename ,
" parallel_restore should not return \n " ) ;
exit_horribly ( modulename ,
" parallel_restore should not return \n " ) ;
}
else if ( child < 0 )
{
/* fork failed */
die_horribly ( args - > AH , modulename ,
" could not create worker process: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename ,
" could not create worker process: %s \n " ,
strerror ( errno ) ) ;
}
# else
child = ( HANDLE ) _beginthreadex ( NULL , 0 , ( void * ) parallel_restore ,
args , 0 , NULL ) ;
if ( child = = 0 )
die_horribly ( args - > AH , modulename ,
" could not create worker thread: %s \n " ,
strerror ( errno ) ) ;
exit_horribly ( modulename ,
" could not create worker thread: %s \n " ,
strerror ( errno ) ) ;
# endif
return child ;
@ -3806,6 +3872,8 @@ parallel_restore(RestoreArgs *args)
RestoreOptions * ropt = AH - > ropt ;
int retval ;
setProcessIdentifier ( args - > pse , AH ) ;
/*
* Close and reopen the input file so we have a private file pointer that
* doesn ' t stomp on anyone else ' s file pointer , if we ' re actually going to
@ -3836,6 +3904,7 @@ parallel_restore(RestoreArgs *args)
/* And clean up */
DisconnectDatabase ( ( Archive * ) AH ) ;
unsetProcessIdentifier ( args - > pse ) ;
/* If we reopened the file, we are done with it, so close it now */
if ( te - > section = = SECTION_DATA )
@ -3881,7 +3950,7 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
}
if ( te = = NULL )
die_horribly ( AH , modulename , " could not find slot of finished worker \n " ) ;
exit_horribly ( modulename , " could not find slot of finished worker \n " ) ;
ahlog ( AH , 1 , " finished item %d %s %s \n " ,
te - > dumpId , te - > desc , te - > tag ) ;
@ -3896,8 +3965,8 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
else if ( status = = WORKER_IGNORED_ERRORS )
AH - > public . n_errors + + ;
else if ( status ! = 0 )
die_horribly ( AH , modulename , " worker process failed: exit code %d \n " ,
status ) ;
exit_horribly ( modulename , " worker process failed: exit code %d \n " ,
status ) ;
reduce_dependencies ( AH , te , ready_list ) ;
}