@ -78,13 +78,12 @@ int sighup = 0;
static struct cl_stat * dbstat = NULL ;
typedef struct client_conn_tag {
const char * cmd ;
int sd ;
unsigned int options ;
const struct optstruct * opts ;
struct cl_engine * engine ;
time_t engine_timestamp ;
int * socketds ;
int nsockets ;
} client_conn_t ;
static void scanner_thread ( void * arg )
@ -281,6 +280,151 @@ static struct cl_engine *reload_db(struct cl_engine *engine, unsigned int dbopti
return engine ;
}
struct rcvloop {
threadpool_t * thr_pool ;
struct fd_data * fds ;
} ;
static const char * get_cmd ( struct fd_buf * buf )
{
unsigned char * pos ;
if ( ! buf - > off )
return NULL ;
switch ( buf - > buffer [ 0 ] ) {
/* commands terminated by delimiters */
case ' n ' :
case ' z ' :
pos = memchr ( buf - > buffer , buf - > buffer [ 0 ] , buf - > off ) ;
if ( ! pos )
/* we don't have another full command yet */
return NULL ;
* pos = ' \0 ' ;
return buf - > buffer + 1 ;
default :
/* one packet = one command */
return buf - > buffer ;
}
}
void * recvloop_th ( void * arg )
{
struct rcvloop * rcvloop = ( struct rcvloop * ) arg ;
struct fd_data * fds = rcvloop - > fds ;
pthread_mutex_lock ( fds - > buf_mutex ) ;
for ( ; ; ) {
size_t i ;
int status = fds_poll_recv ( fds , - 1 , 1 ) ;
if ( status = = - 1 ) {
/* very bad, already logged */
continue ;
}
/* Loop over all sockets, checking which have a new command
* available */
for ( i = 0 ; i < fds - > nfds ; i + + ) {
client_conn_t * client_conn ;
struct fd_buf * buf = & fds - > buf [ i ] ;
const char * cmd ;
if ( buf - > fd = = - 1 | | ! buf - > got_newdata | |
! has_fullcmd ( buf ) )
continue ;
/* Parse commands */
while ( ( cmd = get_cmd ( buf ) ! = NULL ) ) {
client_conn = ( client_conn_t * ) malloc ( sizeof ( struct client_conn_tag ) ) ;
if ( client_conn ) {
client_conn - > sd = fds - > buf [ i ] . fd ;
client_conn - > fds = fds ;
client_conn - > cmd = cmd ;
client_conn - > options = options ;
client_conn - > opts = opts ;
if ( cl_engine_addref ( engine ) ) {
closesocket ( client_conn - > sd ) ;
free ( client_conn ) ;
logg ( " !cl_engine_addref() failed \n " ) ;
pthread_mutex_lock ( & exit_mutex ) ;
progexit = 1 ;
pthread_mutex_unlock ( & exit_mutex ) ;
} else {
client_conn - > engine = engine ;
client_conn - > engine_timestamp = reloaded_time ;
client_conn - > socketds = socketds ;
client_conn - > nsockets = nsockets ;
if ( ! thrmgr_dispatch ( thr_pool , client_conn ) ) {
closesocket ( client_conn - > sd ) ;
free ( client_conn ) ;
logg ( " !thread dispatch failed \n " ) ;
}
}
} else {
logg ( " !Can't allocate memory for client_conn \n " ) ;
closesocket ( new_sd ) ;
if ( optget ( opts , " ExitOnOOM " ) - > enabled ) {
pthread_mutex_lock ( & exit_mutex ) ;
progexit = 1 ;
pthread_mutex_unlock ( & exit_mutex ) ;
}
}
}
/* SIGHUP */
if ( sighup ) {
logg ( " SIGHUP caught: re-opening log file. \n " ) ;
logg_close ( ) ;
sighup = 0 ;
if ( ! logg_file & & ( opt = optget ( opts , " LogFile " ) ) - > enabled )
logg_file = opt - > strarg ;
}
/* SelfCheck */
if ( selfchk ) {
time ( & current_time ) ;
if ( ( current_time - start_time ) > ( time_t ) selfchk ) {
if ( reload_db ( engine , dboptions , opts , TRUE , & ret ) ) {
pthread_mutex_lock ( & reload_mutex ) ;
reload = 1 ;
pthread_mutex_unlock ( & reload_mutex ) ;
}
time ( & start_time ) ;
}
}
/* DB reload */
pthread_mutex_lock ( & reload_mutex ) ;
if ( reload ) {
pthread_mutex_unlock ( & reload_mutex ) ;
engine = reload_db ( engine , dboptions , opts , FALSE , & ret ) ;
if ( ret ) {
logg ( " Terminating because of a fatal error. \n " ) ;
if ( new_sd > = 0 )
closesocket ( new_sd ) ;
break ;
}
pthread_mutex_lock ( & reload_mutex ) ;
reload = 0 ;
time ( & reloaded_time ) ;
pthread_mutex_unlock ( & reload_mutex ) ;
# ifdef CLAMUKO
if ( optget ( opts , " ClamukoScanOnAccess " ) - > enabled & & tharg ) {
logg ( " Stopping and restarting Clamuko. \n " ) ;
pthread_kill ( clamuko_pid , SIGUSR1 ) ;
pthread_join ( clamuko_pid , NULL ) ;
tharg - > engine = engine ;
pthread_create ( & clamuko_pid , & clamuko_attr , clamukoth , tharg ) ;
}
# endif
} else {
pthread_mutex_unlock ( & reload_mutex ) ;
}
}
}
pthread_mutex_unlock ( fds - > buf_mutex ) ;
return NULL
}
int acceptloop_th ( int * socketds , int nsockets , struct cl_engine * engine , unsigned int dboptions , const struct optstruct * opts )
{
int max_threads , i , ret = 0 ;
@ -305,6 +449,9 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
uint32_t val32 ;
uint64_t val64 ;
struct fd_data fds ;
pthread_t rcvth ;
struct rcvloop rcvloop ;
int notifypip [ 2 ] ; /* notify recvloop that it has a new fd */
# ifdef CLAMUKO
pthread_t clamuko_pid ;
@ -600,6 +747,18 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
idletimeout = optget ( opts , " IdleTimeout " ) - > numarg ;
memset ( & fds , 0 , sizeof ( fds ) ) ;
if ( pipe ( notifypip , 0 ) = = - 1 ) {
logg ( " !pipe failed \n " ) ;
cl_engine_free ( engine ) ;
return 1 ;
}
if ( add_fd_to_poll ( & fds , notifypip [ 0 ] ) = = - 1 ) {
logg ( " !add_fd_to_poll failed failed \n " ) ;
cl_engine_free ( engine ) ;
return 1 ;
}
for ( i = 0 ; i < nsockets ; i + + )
if ( add_fd_to_poll ( & fds , socketds [ i ] , 1 ) = = - 1 ) {
logg ( " !add_fd_to_poll failed failed \n " ) ;
@ -607,15 +766,31 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
return 1 ;
}
if ( ( thr_pool = thrmgr_new ( max_threads , idletimeout , scanner_thread ) ) = = NULL ) {
if ( ( thr_pool = thrmgr_new ( max_threads , idletimeout , scanner_thread ) ) = = NULL ) {
logg ( " !thrmgr_new failed \n " ) ;
exit ( - 1 ) ;
}
rcvloop . thr_pool = & thr_pool ;
rcvloop . fds = & fds ;
if ( pthread_create ( & rcvth , NULL , recvloop_th , & rcvloop ) ) {
logg ( " !pthread_create failed \n " ) ;
exit ( - 1 ) ;
}
if ( pthread_detach ( & rcvth ) ) {
logg ( " !pthread_detach failed \n " ) ;
exit ( - 1 ) ;
}
time ( & start_time ) ;
pthread_mutex_lock ( & fds . buf_mutex ) ;
for ( ; ; ) {
int new_sd = fds_poll_recv ( & fds , - 1 , 1 ) ;
/* Block waiting for connection on any of the sockets,
* doesn ' t wake on signals , that is what recvloop does ! */
int new_sd = fds_poll_recv ( & fds , - 1 , 0 ) ;
/* TODO: what about sockets that get rm-ed? */
if ( ! fds . nfds ) {
/* no more sockets to poll, all gave an error */
@ -628,48 +803,18 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
pthread_mutex_lock ( & exit_mutex ) ;
if ( prog_exit ) {
pthread_mutex_unlock ( & exit_mutex ) ;
# ifdef C_WINDOWS
closesocket ( new_sd ) ;
# else
if ( new_sd > = 0 )
close ( new_sd ) ;
# endif
closesocket ( new_sd ) ;
break ;
}
pthread_mutex_unlock ( & exit_mutex ) ;
if ( new_sd > = 0 ) {
client_conn = ( client_conn_t * ) malloc ( sizeof ( struct client_conn_tag ) ) ;
if ( client_conn ) {
client_conn - > sd = new_sd ;
client_conn - > options = options ;
client_conn - > opts = opts ;
if ( cl_engine_addref ( engine ) ) {
closesocket ( client_conn - > sd ) ;
free ( client_conn ) ;
logg ( " !cl_engine_addref() failed \n " ) ;
pthread_mutex_lock ( & exit_mutex ) ;
progexit = 1 ;
pthread_mutex_unlock ( & exit_mutex ) ;
} else {
client_conn - > engine = engine ;
client_conn - > engine_timestamp = reloaded_time ;
client_conn - > socketds = socketds ;
client_conn - > nsockets = nsockets ;
if ( ! thrmgr_dispatch ( thr_pool , client_conn ) ) {
closesocket ( client_conn - > sd ) ;
free ( client_conn ) ;
logg ( " !thread dispatch failed \n " ) ;
}
}
} else {
logg ( " !Can't allocate memory for client_conn \n " ) ;
closesocket ( new_sd ) ;
if ( optget ( opts , " ExitOnOOM " ) - > enabled ) {
pthread_mutex_lock ( & exit_mutex ) ;
progexit = 1 ;
pthread_mutex_unlock ( & exit_mutex ) ;
}
char dummy = ' x ' ;
fds_add ( & fds , new_sd , 0 ) ;
if ( write ( notifypip [ 1 ] , & dummy , 1 ) = = - 1 ) {
/* failed to notify */
logg ( " !acceptloop_th: failed to notify recvloop \n " ) ;
}
}
}
@ -691,58 +836,8 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
continue ;
}
if ( sighup ) {
logg ( " SIGHUP caught: re-opening log file. \n " ) ;
logg_close ( ) ;
sighup = 0 ;
if ( ! logg_file & & ( opt = optget ( opts , " LogFile " ) ) - > enabled )
logg_file = opt - > strarg ;
}
if ( selfchk ) {
time ( & current_time ) ;
if ( ( current_time - start_time ) > ( time_t ) selfchk ) {
if ( reload_db ( engine , dboptions , opts , TRUE , & ret ) ) {
pthread_mutex_lock ( & reload_mutex ) ;
reload = 1 ;
pthread_mutex_unlock ( & reload_mutex ) ;
}
time ( & start_time ) ;
}
}
pthread_mutex_lock ( & reload_mutex ) ;
if ( reload ) {
pthread_mutex_unlock ( & reload_mutex ) ;
engine = reload_db ( engine , dboptions , opts , FALSE , & ret ) ;
if ( ret ) {
logg ( " Terminating because of a fatal error. \n " ) ;
# ifdef C_WINDOWS
closesocket ( new_sd ) ;
# else
if ( new_sd > = 0 )
close ( new_sd ) ;
# endif
break ;
}
pthread_mutex_lock ( & reload_mutex ) ;
reload = 0 ;
time ( & reloaded_time ) ;
pthread_mutex_unlock ( & reload_mutex ) ;
# ifdef CLAMUKO
if ( optget ( opts , " ClamukoScanOnAccess " ) - > enabled & & tharg ) {
logg ( " Stopping and restarting Clamuko. \n " ) ;
pthread_kill ( clamuko_pid , SIGUSR1 ) ;
pthread_join ( clamuko_pid , NULL ) ;
tharg - > engine = engine ;
pthread_create ( & clamuko_pid , & clamuko_attr , clamukoth , tharg ) ;
}
# endif
} else {
pthread_mutex_unlock ( & reload_mutex ) ;
}
}
pthread_mutex_unlock ( & fds . buf_mutex ) ;
fds_free ( & fds ) ;
/* Destroy the thread manager.