@ -25,25 +25,16 @@
# include "common/logging.h"
# include "fe_utils/cancel.h"
# include "fe_utils/parallel_slot.h"
# include "fe_utils/query_utils.h"
# define ERRCODE_UNDEFINED_TABLE "42P01"
static void init_slot ( ParallelSlot * slot , PGconn * conn ) ;
static int select_loop ( int maxFd , fd_set * workerset ) ;
static bool processQueryResult ( ParallelSlot * slot , PGresult * result ) ;
static void
init_slot ( ParallelSlot * slot , PGconn * conn )
{
slot - > connection = conn ;
/* Initially assume connection is idle */
slot - > isFree = true ;
ParallelSlotClearHandler ( slot ) ;
}
/*
* Process ( and delete ) a query result . Returns true if there ' s no problem ,
* false otherwise . It ' s up to the handler to decide what cos ntitutes a
* false otherwise . It ' s up to the handler to decide what constitutes a
* problem .
*/
static bool
@ -137,151 +128,316 @@ select_loop(int maxFd, fd_set *workerset)
}
/*
* ParallelSlotsGetIdle
* Return a connection slot that is ready to execute a command .
*
* This returns the first slot we find that is marked isFree , if one is ;
* otherwise , we loop on select ( ) until one socket becomes available . When
* this happens , we read the whole set and mark as free all sockets that
* become available . If an error occurs , NULL is returned .
* Return the offset of a suitable idle slot , or - 1 if none are available . If
* the given dbname is not null , only idle slots connected to the given
* database are considered suitable , otherwise all idle connected slots are
* considered suitable .
*/
ParallelSlot *
ParallelSlotsGetIdle ( ParallelSlot * slots , int numslots )
static int
find_matching_idle_slot ( const ParallelSlotArray * sa , const char * dbname )
{
int i ;
int firstFree = - 1 ;
/*
* Look for any connection currently free . If there is one , mark it as
* taken and let the caller know the slot to use .
*/
for ( i = 0 ; i < numslots ; i + + )
for ( i = 0 ; i < sa - > numslots ; i + + )
{
if ( slots [ i ] . isFree )
{
slots [ i ] . isFree = false ;
return slots + i ;
}
if ( sa - > slots [ i ] . inUse )
continue ;
if ( sa - > slots [ i ] . connection = = NULL )
continue ;
if ( dbname = = NULL | |
strcmp ( PQdb ( sa - > slots [ i ] . connection ) , dbname ) = = 0 )
return i ;
}
return - 1 ;
}
/*
* Return the offset of the first slot without a database connection , or - 1 if
* all slots are connected .
*/
static int
find_unconnected_slot ( const ParallelSlotArray * sa )
{
int i ;
for ( i = 0 ; i < sa - > numslots ; i + + )
{
if ( sa - > slots [ i ] . inUse )
continue ;
if ( sa - > slots [ i ] . connection = = NULL )
return i ;
}
return - 1 ;
}
/*
* Return the offset of the first idle slot , or - 1 if all slots are busy .
*/
static int
find_any_idle_slot ( const ParallelSlotArray * sa )
{
int i ;
for ( i = 0 ; i < sa - > numslots ; i + + )
if ( ! sa - > slots [ i ] . inUse )
return i ;
return - 1 ;
}
/*
* Wait for any slot ' s connection to have query results , consume the results ,
* and update the slot ' s status as appropriate . Returns true on success ,
* false on cancellation , on error , or if no slots are connected .
*/
static bool
wait_on_slots ( ParallelSlotArray * sa )
{
int i ;
fd_set slotset ;
int maxFd = 0 ;
PGconn * cancelconn = NULL ;
/* We must reconstruct the fd_set for each call to select_loop */
FD_ZERO ( & slotset ) ;
for ( i = 0 ; i < sa - > numslots ; i + + )
{
int sock ;
/* We shouldn't get here if we still have slots without connections */
Assert ( sa - > slots [ i ] . connection ! = NULL ) ;
sock = PQsocket ( sa - > slots [ i ] . connection ) ;
/*
* We don ' t really expect any connections to lose their sockets after
* startup , but just in case , cope by ignoring them .
*/
if ( sock < 0 )
continue ;
/* Keep track of the first valid connection we see. */
if ( cancelconn = = NULL )
cancelconn = sa - > slots [ i ] . connection ;
FD_SET ( sock , & slotset ) ;
if ( sock > maxFd )
maxFd = sock ;
}
/*
* No free slot found , so wait until one of the connections has finished
* its task and return the available slot .
* If we get this far with no valid connections , processing cannot
* continue .
*/
while ( firstFree < 0 )
if ( cancelconn = = NULL )
return false ;
SetCancelConn ( sa - > slots - > connection ) ;
i = select_loop ( maxFd , & slotset ) ;
ResetCancelConn ( ) ;
/* failure? */
if ( i < 0 )
return false ;
for ( i = 0 ; i < sa - > numslots ; i + + )
{
fd_set slotset ;
int maxFd = 0 ;
int sock ;
/* We must reconstruct the fd_set for each call to select_loop */
FD_ZERO ( & slotset ) ;
sock = PQsocket ( sa - > slots [ i ] . connection ) ;
for ( i = 0 ; i < numslots ; i + + )
if ( sock > = 0 & & FD_ISSET ( sock , & slotset ) )
{
int sock = PQsocket ( slots [ i ] . connection ) ;
/*
* We don ' t really expect any connections to lose their sockets
* after startup , but just in case , cope by ignoring them .
*/
if ( sock < 0 )
continue ;
FD_SET ( sock , & slotset ) ;
if ( sock > maxFd )
maxFd = sock ;
/* select() says input is available, so consume it */
PQconsumeInput ( sa - > slots [ i ] . connection ) ;
}
SetCancelConn ( slots - > connection ) ;
i = select_loop ( maxFd , & slotset ) ;
ResetCancelConn ( ) ;
/* failure? */
if ( i < 0 )
return NULL ;
for ( i = 0 ; i < numslots ; i + + )
/* Collect result(s) as long as any are available */
while ( ! PQisBusy ( sa - > slots [ i ] . connection ) )
{
int sock = PQsocket ( slots [ i ] . connection ) ;
PGresult * result = PQgetResult ( sa - > slots [ i ] . connection ) ;
if ( sock > = 0 & & FD_ISSET ( sock , & slotset ) )
if ( result ! = NULL )
{
/* select() says input is available, so consume it */
PQconsumeInput ( slots [ i ] . connection ) ;
/* Handle and discard the command result */
if ( ! processQueryResult ( & sa - > slots [ i ] , result ) )
return false ;
}
/* Collect result(s) as long as any are available */
while ( ! PQisBusy ( slots [ i ] . connection ) )
else
{
PGresult * result = PQgetResult ( slots [ i ] . connection ) ;
if ( result ! = NULL )
{
/* Handle and discard the command result */
if ( ! processQueryResult ( slots + i , result ) )
return NULL ;
}
else
{
/* This connection has become idle */
slots [ i ] . isFree = true ;
ParallelSlotClearHandler ( slots + i ) ;
if ( firstFree < 0 )
firstFree = i ;
break ;
}
/* This connection has become idle */
sa - > slots [ i ] . inUse = false ;
ParallelSlotClearHandler ( & sa - > slots [ i ] ) ;
break ;
}
}
}
return true ;
}
slots [ firstFree ] . isFree = false ;
return slots + firstFree ;
/*
* Open a new database connection using the stored connection parameters and
* optionally a given dbname if not null , execute the stored initial command if
* any , and associate the new connection with the given slot .
*/
static void
connect_slot ( ParallelSlotArray * sa , int slotno , const char * dbname )
{
const char * old_override ;
ParallelSlot * slot = & sa - > slots [ slotno ] ;
old_override = sa - > cparams - > override_dbname ;
if ( dbname )
sa - > cparams - > override_dbname = dbname ;
slot - > connection = connectDatabase ( sa - > cparams , sa - > progname , sa - > echo , false , true ) ;
sa - > cparams - > override_dbname = old_override ;
if ( PQsocket ( slot - > connection ) > = FD_SETSIZE )
{
pg_log_fatal ( " too many jobs for this platform " ) ;
exit ( 1 ) ;
}
/* Setup the connection using the supplied command, if any. */
if ( sa - > initcmd )
executeCommand ( slot - > connection , sa - > initcmd , sa - > echo ) ;
}
/*
* ParallelSlotsSetup
* Prepare a set of parallel slots to use on a given database .
* ParallelSlotsGetIdle
* Return a connection slot that is ready to execute a command .
*
* The slot returned is chosen as follows :
*
* If any idle slot already has an open connection , and if either dbname is
* null or the existing connection is to the given database , that slot will be
* returned allowing the connection to be reused .
*
* Otherwise , if any idle slot is not yet connected to any database , the slot
* will be returned with it ' s connection opened using the stored cparams and
* optionally the given dbname if not null .
*
* Otherwise , if any idle slot exists , an idle slot will be chosen and returned
* after having it ' s connection disconnected and reconnected using the stored
* cparams and optionally the given dbname if not null .
*
* This creates and initializes a set of connections to the database
* using the information given by the caller , marking all parallel slots
* as free and ready to use . " conn " is an initial connection set up
* by the caller and is associated with the first slot in the parallel
* set .
* Otherwise , if any slots have connections that are busy , we loop on select ( )
* until one socket becomes available . When this happens , we read the whole
* set and mark as free all sockets that become available . We then select a
* slot using the same rules as above .
*
* Otherwise , we cannot return a slot , which is an error , and NULL is returned .
*
* For any connection created , if the stored initcmd is not null , it will be
* executed as a command on the newly formed connection before the slot is
* returned .
*
* If an error occurs , NULL is returned .
*/
ParallelSlot *
ParallelSlotsSetup ( const ConnParams * cparams ,
const char * progname , bool echo ,
PGconn * conn , int numslots )
ParallelSlotsGetIdle ( ParallelSlotArray * sa , const char * dbname )
{
ParallelSlot * slots ;
int i ;
int offset ;
Assert ( conn ! = NULL ) ;
Assert ( sa ) ;
Assert ( sa - > numslots > 0 ) ;
slots = ( ParallelSlot * ) pg_malloc ( sizeof ( ParallelSlot ) * numslots ) ;
init_slot ( slots , conn ) ;
if ( numslots > 1 )
while ( 1 )
{
for ( i = 1 ; i < numslots ; i + + )
/* First choice: a slot already connected to the desired database. */
offset = find_matching_idle_slot ( sa , dbname ) ;
if ( offset > = 0 )
{
conn = connectDatabase ( cparams , progname , echo , false , true ) ;
/*
* Fail and exit immediately if trying to use a socket in an
* unsupported range . POSIX requires open ( 2 ) to use the lowest
* unused file descriptor and the hint given relies on that .
*/
if ( PQsocket ( conn ) > = FD_SETSIZE )
{
pg_log_fatal ( " too many jobs for this platform -- try %d " , i ) ;
exit ( 1 ) ;
}
sa - > slots [ offset ] . inUse = true ;
return & sa - > slots [ offset ] ;
}
/* Second choice: a slot not connected to any database. */
offset = find_unconnected_slot ( sa ) ;
if ( offset > = 0 )
{
connect_slot ( sa , offset , dbname ) ;
sa - > slots [ offset ] . inUse = true ;
return & sa - > slots [ offset ] ;
}
init_slot ( slots + i , conn ) ;
/* Third choice: a slot connected to the wrong database. */
offset = find_any_idle_slot ( sa ) ;
if ( offset > = 0 )
{
disconnectDatabase ( sa - > slots [ offset ] . connection ) ;
sa - > slots [ offset ] . connection = NULL ;
connect_slot ( sa , offset , dbname ) ;
sa - > slots [ offset ] . inUse = true ;
return & sa - > slots [ offset ] ;
}
/*
* Fourth choice : block until one or more slots become available . If
* any slots hit a fatal error , we ' ll find out about that here and
* return NULL .
*/
if ( ! wait_on_slots ( sa ) )
return NULL ;
}
}
/*
* ParallelSlotsSetup
* Prepare a set of parallel slots but do not connect to any database .
*
* This creates and initializes a set of slots , marking all parallel slots as
* free and ready to use . Establishing connections is delayed until requesting
* a free slot . The cparams , progname , echo , and initcmd are stored for later
* use and must remain valid for the lifetime of the returned array .
*/
ParallelSlotArray *
ParallelSlotsSetup ( int numslots , ConnParams * cparams , const char * progname ,
bool echo , const char * initcmd )
{
ParallelSlotArray * sa ;
return slots ;
Assert ( numslots > 0 ) ;
Assert ( cparams ! = NULL ) ;
Assert ( progname ! = NULL ) ;
sa = ( ParallelSlotArray * ) palloc0 ( offsetof ( ParallelSlotArray , slots ) +
numslots * sizeof ( ParallelSlot ) ) ;
sa - > numslots = numslots ;
sa - > cparams = cparams ;
sa - > progname = progname ;
sa - > echo = echo ;
sa - > initcmd = initcmd ;
return sa ;
}
/*
* ParallelSlotsAdoptConn
* Assign an open connection to the slots array for reuse .
*
* This turns over ownership of an open connection to a slots array . The
* caller should not further use or close the connection . All the connection ' s
* parameters ( user , host , port , etc . ) except possibly dbname should match
* those of the slots array ' s cparams , as given in ParallelSlotsSetup . If
* these parameters differ , subsequent behavior is undefined .
*/
void
ParallelSlotsAdoptConn ( ParallelSlotArray * sa , PGconn * conn )
{
int offset ;
offset = find_unconnected_slot ( sa ) ;
if ( offset > = 0 )
sa - > slots [ offset ] . connection = conn ;
else
disconnectDatabase ( conn ) ;
}
/*
@ -292,13 +448,13 @@ ParallelSlotsSetup(const ConnParams *cparams,
* terminate all connections .
*/
void
ParallelSlotsTerminate ( ParallelSlot * slots , int numslots )
ParallelSlotsTerminate ( ParallelSlotArray * sa )
{
int i ;
for ( i = 0 ; i < numslots ; i + + )
for ( i = 0 ; i < sa - > numslots ; i + + )
{
PGconn * conn = slots [ i ] . connection ;
PGconn * conn = sa - > s lots [ i ] . connection ;
if ( conn = = NULL )
continue ;
@ -314,13 +470,15 @@ ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
* error has been found on the way .
*/
bool
ParallelSlotsWaitCompletion ( ParallelSlot * slots , int numslots )
ParallelSlotsWaitCompletion ( ParallelSlotArray * sa )
{
int i ;
for ( i = 0 ; i < numslots ; i + + )
for ( i = 0 ; i < sa - > numslots ; i + + )
{
if ( ! consumeQueryResult ( slots + i ) )
if ( sa - > slots [ i ] . connection = = NULL )
continue ;
if ( ! consumeQueryResult ( & sa - > slots [ i ] ) )
return false ;
}
@ -350,6 +508,9 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
bool
TableCommandResultHandler ( PGresult * res , PGconn * conn , void * context )
{
Assert ( res ! = NULL ) ;
Assert ( conn ! = NULL ) ;
/*
* If it ' s an error , report it . Errors about a missing table are harmless
* so we continue processing ; but die for other errors .