@ -10,10 +10,14 @@
*/
# include "postgres_fe.h"
# include "catalog/pg_class_d.h"
# include "common.h"
# include "common/logging.h"
# include "fe_utils/connect.h"
# include "fe_utils/simple_list.h"
# include "fe_utils/string_utils.h"
# include "scripts_parallel.h"
typedef enum ReindexType
{
@ -25,16 +29,26 @@ typedef enum ReindexType
} ReindexType ;
static void reindex_one_database ( const char * name , const char * dbname ,
ReindexType type , const char * host ,
static SimpleStringList * get_parallel_object_list ( PGconn * conn ,
ReindexType type ,
SimpleStringList * user_list ,
bool echo ) ;
static void reindex_one_database ( const char * dbname , ReindexType type ,
SimpleStringList * user_list , const char * host ,
const char * port , const char * username ,
enum trivalue prompt_password , const char * progname ,
bool echo , bool verbose , bool concurrently ) ;
bool echo , bool verbose , bool concurrently ,
int concurrentCons ) ;
static void reindex_all_databases ( const char * maintenance_db ,
const char * host , const char * port ,
const char * username , enum trivalue prompt_password ,
const char * progname , bool echo ,
bool quiet , bool verbose , bool concurrently ) ;
bool quiet , bool verbose , bool concurrently ,
int concurrentCons ) ;
static void run_reindex_command ( PGconn * conn , ReindexType type ,
const char * name , bool echo , bool verbose ,
bool concurrently , bool async ) ;
static void help ( const char * progname ) ;
int
@ -54,6 +68,7 @@ main(int argc, char *argv[])
{ " system " , no_argument , NULL , ' s ' } ,
{ " table " , required_argument , NULL , ' t ' } ,
{ " index " , required_argument , NULL , ' i ' } ,
{ " jobs " , required_argument , NULL , ' j ' } ,
{ " verbose " , no_argument , NULL , ' v ' } ,
{ " concurrently " , no_argument , NULL , 1 } ,
{ " maintenance-db " , required_argument , NULL , 2 } ,
@ -79,6 +94,7 @@ main(int argc, char *argv[])
SimpleStringList indexes = { NULL , NULL } ;
SimpleStringList tables = { NULL , NULL } ;
SimpleStringList schemas = { NULL , NULL } ;
int concurrentCons = 1 ;
pg_logging_init ( argv [ 0 ] ) ;
progname = get_progname ( argv [ 0 ] ) ;
@ -87,7 +103,7 @@ main(int argc, char *argv[])
handle_help_version_opts ( argc , argv , " reindexdb " , help ) ;
/* process command-line options */
while ( ( c = getopt_long ( argc , argv , " h:p:U:wWeqS:d:ast:i:v " , long_options , & optindex ) ) ! = - 1 )
while ( ( c = getopt_long ( argc , argv , " h:p:U:wWeqS:d:ast:i:j: v " , long_options , & optindex ) ) ! = - 1 )
{
switch ( c )
{
@ -130,6 +146,20 @@ main(int argc, char *argv[])
case ' i ' :
simple_string_list_append ( & indexes , optarg ) ;
break ;
case ' j ' :
concurrentCons = atoi ( optarg ) ;
if ( concurrentCons < = 0 )
{
pg_log_error ( " number of parallel jobs must be at least 1 " ) ;
exit ( 1 ) ;
}
if ( concurrentCons > FD_SETSIZE - 1 )
{
pg_log_error ( " too many parallel jobs requested (maximum: %d) " ,
FD_SETSIZE - 1 ) ;
exit ( 1 ) ;
}
break ;
case ' v ' :
verbose = true ;
break ;
@ -194,7 +224,8 @@ main(int argc, char *argv[])
}
reindex_all_databases ( maintenance_db , host , port , username ,
prompt_password , progname , echo , quiet , verbose , concurrently ) ;
prompt_password , progname , echo , quiet , verbose ,
concurrently , concurrentCons ) ;
}
else if ( syscatalog )
{
@ -214,6 +245,12 @@ main(int argc, char *argv[])
exit ( 1 ) ;
}
if ( concurrentCons > 1 )
{
pg_log_error ( " cannot use multiple jobs to reindex system catalogs " ) ;
exit ( 1 ) ;
}
if ( dbname = = NULL )
{
if ( getenv ( " PGDATABASE " ) )
@ -224,12 +261,23 @@ main(int argc, char *argv[])
dbname = get_user_name_or_exit ( progname ) ;
}
reindex_one_database ( NULL , dbname , REINDEX_SYSTEM , host ,
reindex_one_database ( dbname , REINDEX_SYSTEM , NULL , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ) ;
echo , verbose , concurrently , 1 ) ;
}
else
{
/*
* Index - level REINDEX is not supported with multiple jobs as we
* cannot control the concurrent processing of multiple indexes
* depending on the same relation .
*/
if ( concurrentCons > 1 & & indexes . head ! = NULL )
{
pg_log_error ( " cannot use multiple jobs to reindex indexes " ) ;
exit ( 1 ) ;
}
if ( dbname = = NULL )
{
if ( getenv ( " PGDATABASE " ) )
@ -241,61 +289,49 @@ main(int argc, char *argv[])
}
if ( schemas . head ! = NULL )
{
SimpleStringListCell * cell ;
for ( cell = schemas . head ; cell ; cell = cell - > next )
{
reindex_one_database ( cell - > val , dbname , REINDEX_SCHEMA , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ) ;
}
}
reindex_one_database ( dbname , REINDEX_SCHEMA , & schemas , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently , concurrentCons ) ;
if ( indexes . head ! = NULL )
{
SimpleStringListCell * cell ;
reindex_one_database ( dbname , REINDEX_INDEX , & indexes , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently , 1 ) ;
for ( cell = indexes . head ; cell ; cell = cell - > next )
{
reindex_one_database ( cell - > val , dbname , REINDEX_INDEX , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ) ;
}
}
if ( tables . head ! = NULL )
{
SimpleStringListCell * cell ;
for ( cell = tables . head ; cell ; cell = cell - > next )
{
reindex_one_database ( cell - > val , dbname , REINDEX_TABLE , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ) ;
}
}
reindex_one_database ( dbname , REINDEX_TABLE , & tables , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ,
concurrentCons ) ;
/*
* reindex database only if neither index nor table nor schema is
* specified
*/
if ( indexes . head = = NULL & & tables . head = = NULL & & schemas . head = = NULL )
reindex_one_database ( NULL , dbname , REINDEX_DATABASE , host ,
reindex_one_database ( dbname , REINDEX_DATABASE , NULL , host ,
port , username , prompt_password , progname ,
echo , verbose , concurrently ) ;
echo , verbose , concurrently , concurrentCons ) ;
}
exit ( 0 ) ;
}
static void
reindex_one_database ( const char * name , const char * dbname , ReindexType type ,
const char * host , const char * port , const char * username ,
reindex_one_database ( const char * dbname , ReindexType type ,
SimpleStringList * user_list , const char * host ,
const char * port , const char * username ,
enum trivalue prompt_password , const char * progname , bool echo ,
bool verbose , bool concurrently )
bool verbose , bool concurrently , int concurrentCons )
{
PQExpBufferData sql ;
PGconn * conn ;
SimpleStringListCell * cell ;
bool parallel = concurrentCons > 1 ;
SimpleStringList * process_list = user_list ;
ReindexType process_type = type ;
ParallelSlot * slots ;
bool failed = false ;
int items_count = 0 ;
conn = connectDatabase ( dbname , host , port , username , prompt_password ,
progname , echo , false , false ) ;
@ -308,6 +344,151 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
exit ( 1 ) ;
}
if ( ! parallel )
{
switch ( process_type )
{
case REINDEX_DATABASE :
case REINDEX_SYSTEM :
/*
* Database and system reindexes only need to work on the
* database itself , so build a list with a single entry .
*/
Assert ( user_list = = NULL ) ;
process_list = pg_malloc0 ( sizeof ( SimpleStringList ) ) ;
simple_string_list_append ( process_list , PQdb ( conn ) ) ;
break ;
case REINDEX_INDEX :
case REINDEX_SCHEMA :
case REINDEX_TABLE :
Assert ( user_list ! = NULL ) ;
break ;
}
}
else
{
switch ( process_type )
{
case REINDEX_DATABASE :
/*
* Database - wide parallel reindex requires special processing .
* If multiple jobs were asked , we have to reindex system
* catalogs first as they cannot be processed in parallel .
*/
if ( concurrently )
pg_log_warning ( " cannot reindex system catalogs concurrently, skipping all " ) ;
else
run_reindex_command ( conn , REINDEX_SYSTEM , PQdb ( conn ) , echo ,
verbose , concurrently , false ) ;
/* Build a list of relations from the database */
process_list = get_parallel_object_list ( conn , process_type ,
user_list , echo ) ;
process_type = REINDEX_TABLE ;
/* Bail out if nothing to process */
if ( process_list = = NULL )
return ;
break ;
case REINDEX_SCHEMA :
Assert ( user_list ! = NULL ) ;
/* Build a list of relations from all the schemas */
process_list = get_parallel_object_list ( conn , process_type ,
user_list , echo ) ;
process_type = REINDEX_TABLE ;
/* Bail out if nothing to process */
if ( process_list = = NULL )
return ;
break ;
case REINDEX_SYSTEM :
case REINDEX_INDEX :
/* not supported */
Assert ( false ) ;
break ;
case REINDEX_TABLE :
/*
* Fall through . The list of items for tables is already
* created .
*/
break ;
}
}
/*
* Adjust the number of concurrent connections depending on the items in
* the list . We choose the minimum between the number of concurrent
* connections and the number of items in the list .
*/
for ( cell = process_list - > head ; cell ; cell = cell - > next )
{
items_count + + ;
/* no need to continue if there are more elements than jobs */
if ( items_count > = concurrentCons )
break ;
}
concurrentCons = Min ( concurrentCons , items_count ) ;
Assert ( concurrentCons > 0 ) ;
Assert ( process_list ! = NULL ) ;
slots = ParallelSlotsSetup ( dbname , host , port , username , prompt_password ,
progname , echo , conn , concurrentCons ) ;
cell = process_list - > head ;
do
{
const char * objname = cell - > val ;
ParallelSlot * free_slot = NULL ;
if ( CancelRequested )
{
failed = true ;
goto finish ;
}
free_slot = ParallelSlotsGetIdle ( slots , concurrentCons ) ;
if ( ! free_slot )
{
failed = true ;
goto finish ;
}
run_reindex_command ( free_slot - > connection , process_type , objname ,
echo , verbose , concurrently , true ) ;
cell = cell - > next ;
} while ( cell ! = NULL ) ;
if ( ! ParallelSlotsWaitCompletion ( slots , concurrentCons ) )
failed = true ;
finish :
ParallelSlotsTerminate ( slots , concurrentCons ) ;
pfree ( slots ) ;
if ( failed )
exit ( 1 ) ;
}
static void
run_reindex_command ( PGconn * conn , ReindexType type , const char * name ,
bool echo , bool verbose , bool concurrently , bool async )
{
PQExpBufferData sql ;
bool status ;
Assert ( name ) ;
/* build the REINDEX query */
initPQExpBuffer ( & sql ) ;
@ -344,7 +525,7 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
{
case REINDEX_DATABASE :
case REINDEX_SYSTEM :
appendPQExpBufferStr ( & sql , fmtId ( PQdb ( conn ) ) ) ;
appendPQExpBufferStr ( & sql , fmtId ( name ) ) ;
break ;
case REINDEX_INDEX :
case REINDEX_TABLE :
@ -358,7 +539,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
/* finish the query */
appendPQExpBufferChar ( & sql , ' ; ' ) ;
if ( ! executeMaintenanceCommand ( conn , sql . data , echo ) )
if ( async )
{
if ( echo )
printf ( " %s \n " , sql . data ) ;
status = PQsendQuery ( conn , sql . data ) = = 1 ;
}
else
status = executeMaintenanceCommand ( conn , sql . data , echo ) ;
if ( ! status )
{
switch ( type )
{
@ -383,20 +574,141 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
name , PQdb ( conn ) , PQerrorMessage ( conn ) ) ;
break ;
}
PQfinish ( conn ) ;
exit ( 1 ) ;
if ( ! async )
{
PQfinish ( conn ) ;
exit ( 1 ) ;
}
}
PQfinish ( conn ) ;
termPQExpBuffer ( & sql ) ;
}
/*
* Prepare the list of objects to process by querying the catalogs .
*
* This function will return a SimpleStringList object containing the entire
* list of tables in the given database that should be processed by a parallel
* database - wide reindex ( excluding system tables ) , or NULL if there ' s no such
* table .
*/
static SimpleStringList *
get_parallel_object_list ( PGconn * conn , ReindexType type ,
SimpleStringList * user_list , bool echo )
{
PQExpBufferData catalog_query ;
PQExpBufferData buf ;
PGresult * res ;
SimpleStringList * tables ;
int ntups ,
i ;
initPQExpBuffer ( & catalog_query ) ;
/*
* The queries here are using a safe search_path , so there ' s no need to
* fully qualify everything .
*/
switch ( type )
{
case REINDEX_DATABASE :
Assert ( user_list = = NULL ) ;
appendPQExpBuffer ( & catalog_query ,
" SELECT c.relname, ns.nspname \n "
" FROM pg_catalog.pg_class c \n "
" JOIN pg_catalog.pg_namespace ns "
" ON c.relnamespace = ns.oid \n "
" WHERE ns.nspname != 'pg_catalog' \n "
" AND c.relkind IN ( "
CppAsString2 ( RELKIND_RELATION ) " , "
CppAsString2 ( RELKIND_MATVIEW ) " ) \n "
" ORDER BY c.relpages DESC; " ) ;
break ;
case REINDEX_SCHEMA :
{
SimpleStringListCell * cell ;
bool nsp_listed = false ;
Assert ( user_list ! = NULL ) ;
/*
* All the tables from all the listed schemas are grabbed at
* once .
*/
appendPQExpBuffer ( & catalog_query ,
" SELECT c.relname, ns.nspname \n "
" FROM pg_catalog.pg_class c \n "
" JOIN pg_catalog.pg_namespace ns "
" ON c.relnamespace = ns.oid \n "
" WHERE c.relkind IN ( "
CppAsString2 ( RELKIND_RELATION ) " , "
CppAsString2 ( RELKIND_MATVIEW ) " ) \n "
" AND ns.nspname IN ( " ) ;
for ( cell = user_list - > head ; cell ; cell = cell - > next )
{
const char * nspname = cell - > val ;
if ( nsp_listed )
appendPQExpBuffer ( & catalog_query , " , " ) ;
else
nsp_listed = true ;
appendStringLiteralConn ( & catalog_query , nspname , conn ) ;
}
appendPQExpBuffer ( & catalog_query , " ) \n "
" ORDER BY c.relpages DESC; " ) ;
}
break ;
case REINDEX_SYSTEM :
case REINDEX_INDEX :
case REINDEX_TABLE :
Assert ( false ) ;
break ;
}
res = executeQuery ( conn , catalog_query . data , echo ) ;
termPQExpBuffer ( & catalog_query ) ;
/*
* If no rows are returned , there are no matching tables , so we are done .
*/
ntups = PQntuples ( res ) ;
if ( ntups = = 0 )
{
PQclear ( res ) ;
PQfinish ( conn ) ;
return NULL ;
}
tables = pg_malloc0 ( sizeof ( SimpleStringList ) ) ;
/* Build qualified identifiers for each table */
initPQExpBuffer ( & buf ) ;
for ( i = 0 ; i < ntups ; i + + )
{
appendPQExpBufferStr ( & buf ,
fmtQualifiedId ( PQgetvalue ( res , i , 1 ) ,
PQgetvalue ( res , i , 0 ) ) ) ;
simple_string_list_append ( tables , buf . data ) ;
resetPQExpBuffer ( & buf ) ;
}
termPQExpBuffer ( & buf ) ;
PQclear ( res ) ;
return tables ;
}
static void
reindex_all_databases ( const char * maintenance_db ,
const char * host , const char * port ,
const char * username , enum trivalue prompt_password ,
const char * progname , bool echo , bool quiet , bool verbose ,
bool concurrently )
bool concurrently , int concurrentCons )
{
PGconn * conn ;
PGresult * result ;
@ -423,9 +735,10 @@ reindex_all_databases(const char *maintenance_db,
appendPQExpBufferStr ( & connstr , " dbname= " ) ;
appendConnStrVal ( & connstr , dbname ) ;
reindex_one_database ( NULL , connstr . data , REINDEX_DATABASE , host ,
reindex_one_database ( connstr . data , REINDEX_DATABASE , NULL , host ,
port , username , prompt_password ,
progname , echo , verbose , concurrently ) ;
progname , echo , verbose , concurrently ,
concurrentCons ) ;
}
termPQExpBuffer ( & connstr ) ;
@ -444,6 +757,7 @@ help(const char *progname)
printf ( _ ( " -d, --dbname=DBNAME database to reindex \n " ) ) ;
printf ( _ ( " -e, --echo show the commands being sent to the server \n " ) ) ;
printf ( _ ( " -i, --index=INDEX recreate specific index(es) only \n " ) ) ;
printf ( _ ( " -j, --jobs=NUM use this many concurrent connections to reindex \n " ) ) ;
printf ( _ ( " -q, --quiet don't write any messages \n " ) ) ;
printf ( _ ( " -s, --system reindex system catalogs \n " ) ) ;
printf ( _ ( " -S, --schema=SCHEMA reindex specific schema(s) only \n " ) ) ;