From b080923b5a6a820a7fe4a4d1096e920d410436a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=B6r=C3=B6k=20Edvin?= Date: Thu, 8 Jan 2009 20:12:04 +0000 Subject: [PATCH] new poll implementation start git-svn-id: file:///var/lib/svn/clamav-devel/branches/clamd-proto@4596 77e5149b-7576-45b1-b177-96237e5ba77b --- ChangeLog | 5 ++++ clamd/others.c | 58 +++++++++++++++++++++++++++-------------------- clamd/others.h | 20 ++++++++++++++++ clamd/server-th.c | 45 ++++++++++++++++-------------------- 4 files changed, 78 insertions(+), 50 deletions(-) diff --git a/ChangeLog b/ChangeLog index 463fc8aed..d79abcda4 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +Thu Jan 8 22:13:25 EET 2009 (edwin) +------------------------------------ + * clamd/others.c, clamd/others.h, clamd/server-th.c: new poll + implementation start + Wed Jan 7 19:40:11 EET 2009 (edwin) ------------------------------------ * clamd/others.c: first version of poll_recv_fds diff --git a/clamd/others.c b/clamd/others.c index 1cef902ed..6f4bda185 100644 --- a/clamd/others.c +++ b/clamd/others.c @@ -441,21 +441,6 @@ int readsock(int sockfd, char *buf, size_t size, unsigned char delim, int timeou return n; } -struct fd_buf { - unsigned char *buffer; - size_t bufsize; - size_t off; - int fd; - int got_newdata; -}; - -struct fd_data { - struct fd_buf *buf; - size_t nfds; -#ifdef HAVE_POLL - struct pollfd *poll_data; -#endif -}; static int realloc_polldata(struct fd_data *data) { @@ -464,7 +449,7 @@ static int realloc_polldata(struct fd_data *data) free(data->poll_data); data->poll_data = malloc(data->nfds*sizeof(*data->poll_data)); if (!data->poll_data) { - logg("!add_fd: Memory allocation failed for poll_data\n"); + logg("!realloc_polldata: Memory allocation failed for poll_data\n"); return -1; } #endif @@ -492,6 +477,8 @@ static void cleanup_fds(struct fd_data *data) static int read_fd_data(struct fd_buf *buf) { + if (!buf->buffer) /* listen-only socket */ + return 0; /* Read the pending packet, it may contain more than one command, but * that is to the cmdparser to handle. * It will handle 1st command, and then move leftover to beginning of buffer @@ -503,7 +490,7 @@ static int read_fd_data(struct fd_buf *buf) buf->got_newdata=1; } -int add_fd_to_poll(struct fd_data *data, int fd) +int fds_add(struct fd_data *data, int fd, int listen_only) { struct fd_buf *buf; unsigned n = data->nfds + 1; @@ -519,10 +506,15 @@ int add_fd_to_poll(struct fd_data *data, int fd) data->buf = buf; data->nfds = n; data->buf[n-1].fd = -1; - data->buf[n-1].bufsize = PATH_MAX+8; - if (!(data->buf[n-1].buffer = malloc(data->buf[n-1].bufsize))) { - logg("!add_fd: Memory allocation failed for command buffer\n"); - return -1; + if (!listen_only) { + data->buf[n-1].bufsize = PATH_MAX+8; + if (!(data->buf[n-1].buffer = malloc(data->buf[n-1].bufsize))) { + logg("!add_fd: Memory allocation failed for command buffer\n"); + return -1; + } + } else { + data->buf[n-1].bufsize = 0; + data->buf[n-1].buf = NULL; } data->buf[n-1].fd = fd; data->buf[n-1].off = 0; @@ -536,8 +528,9 @@ int add_fd_to_poll(struct fd_data *data, int fd) * timeout is specified in seconds, if check_signals is non-zero, then * poll_recv_fds() will return upon receipt of a signal, even if no data * is received on any of the sockets. + * Must be called with buf_mutex held. */ -int poll_recv_fds(struct fd_data *data, int timeout, int check_signals) +int fds_poll_recv(struct fd_data *data, int timeout, int check_signals) { unsigned fdsok = data->nfds; size_t i; @@ -570,7 +563,12 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals) data->poll_data[i].revents = 0; } do { - retval = poll(data->poll_data, data->nfds, timeout); + int n = data->nfds; + + pthread_mutex_unlock(&data->buf_mutex); + retval = poll(data->poll_data, n, timeout); + pthread_mutex_lock(&data->buf_mutex); + if (retval > 0) { fdsok = 0; for (i=0;i < data->nfds; i++) { @@ -612,7 +610,7 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals) return -1; } - maxfd = max(maxfd, fd); + maxfd = MAX(maxfd, fd); } do { @@ -625,7 +623,9 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals) tv.tv_sec = timeout; tv.tv_usec = 0; + pthread_mutex_unlock(&data->buf_mutex); retval = select(maxfd+1, &rfds, NULL, NULL, timeout > 0 ? &tv : NULL); + pthread_mutex_lock(&data->buf_mutex); if (retval > 0) { fdsok = data->nfds; for (i=0; i < data->nfds; i++) { @@ -657,4 +657,12 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals) return retval; } - +void fds_free(struct fd_data *data) +{ + if (data->buf) + free(data->buf); +#ifdef HAVE_POLL + if (data->poll_data) + free(data->poll_data); +#endif +} diff --git a/clamd/others.h b/clamd/others.h index 7f7aca905..c70c4595e 100644 --- a/clamd/others.h +++ b/clamd/others.h @@ -26,6 +26,23 @@ #include #include "shared/optparser.h" +struct fd_buf { + unsigned char *buffer; + size_t bufsize; + size_t off; + int fd; + int got_newdata; +}; + +struct fd_data { + pthread_mutex_t buf_mutex; /* protects buf and nfds */ + struct fd_buf *buf; + size_t nfds; +#ifdef HAVE_POLL + struct pollfd *poll_data; +#endif +}; + int poll_fds(int *fds, int nfds, int timeout_sec, int check_signals); int poll_fd(int fd, int timeout_sec, int check_signals); int is_fd_connected(int fd); @@ -33,5 +50,8 @@ void virusaction(const char *filename, const char *virname, const struct optstru int writen(int fd, void *buff, unsigned int count); int readsock(int sockfd, char *buf, size_t size, unsigned char delim, int timeout_sec, int force_delim, int read_command); +int fds_add(struct fd_data *data, int fd, int listen_only); +int fds_poll_recv(struct fd_data *data, int timeout, int check_signals) +void fds_free(struct fd_data *data); #endif diff --git a/clamd/server-th.c b/clamd/server-th.c index eeb7460a6..262454ecd 100644 --- a/clamd/server-th.c +++ b/clamd/server-th.c @@ -304,6 +304,7 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne int idletimeout; uint32_t val32; uint64_t val64; + struct fd_data fds; #ifdef CLAMUKO pthread_t clamuko_pid; @@ -598,6 +599,14 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne idletimeout = optget(opts, "IdleTimeout")->numarg; + memset(&fds, 0, sizeof(fds)); + for (i=0;i < nsockets;i++) + if (add_fd_to_poll(&fds, socketds[i], 1) == -1) { + logg("!cli_engine_set(CL_ENGINE_MAX_RECURSION) failed: %s\n", cl_strerror(ret)); + cl_engine_free(engine); + return 1; + } + if((thr_pool=thrmgr_new(max_threads, idletimeout, scanner_thread)) == NULL) { logg("!thrmgr_new failed\n"); exit(-1); @@ -605,32 +614,17 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne time(&start_time); - for(;;) { -#if !defined(C_WINDOWS) && !defined(C_BEOS) - struct stat st_buf; -#endif - int socketd = socketds[0]; - int new_sd = 0; - - if(nsockets > 1) { - int pollret = poll_fds(socketds, nsockets, -1, 1); - if(pollret > 0) { - socketd = socketds[pollret - 1]; - } else { - new_sd = -1; - } - } -#if !defined(C_WINDOWS) && !defined(C_BEOS) - if(new_sd != -1 && fstat(socketd, &st_buf) == -1) { - logg("!fstat(): socket descriptor gone\n"); - memmove(socketds, socketds + 1, sizeof(socketds[0]) * nsockets); - nsockets--; - if(!nsockets) { - logg("!Main socket gone: fatal\n"); - break; - } + for(;;) { + int status = fds_poll_recv(&fds, -1, 1); + /* TODO: what about sockets that get rm-ed? */ + if (!fds.nfds) { + /* no more sockets to poll, all gave an error */ + logg("!Main socket gone: fatal\n"); + break; + } + for (i=0;i < nsockets && i < fds.nfds && status > 0; i++) { + status = accept( } -#endif if (new_sd != -1) new_sd = accept(socketd, NULL, NULL); if((new_sd == -1) && (errno != EINTR)) { @@ -751,6 +745,7 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne } } + fds_free(&fds); /* Destroy the thread manager. * This waits for all current tasks to end */