new poll implementation start

git-svn-id: file:///var/lib/svn/clamav-devel/branches/clamd-proto@4596 77e5149b-7576-45b1-b177-96237e5ba77b
remotes/push_mirror/0.95
Török Edvin 17 years ago
parent fe4e3f8cad
commit b080923b5a
  1. 5
      ChangeLog
  2. 58
      clamd/others.c
  3. 20
      clamd/others.h
  4. 45
      clamd/server-th.c

@ -1,3 +1,8 @@
Thu Jan 8 22:13:25 EET 2009 (edwin)
------------------------------------
* clamd/others.c, clamd/others.h, clamd/server-th.c: new poll
implementation start
Wed Jan 7 19:40:11 EET 2009 (edwin)
------------------------------------
* clamd/others.c: first version of poll_recv_fds

@ -441,21 +441,6 @@ int readsock(int sockfd, char *buf, size_t size, unsigned char delim, int timeou
return n;
}
struct fd_buf {
unsigned char *buffer;
size_t bufsize;
size_t off;
int fd;
int got_newdata;
};
struct fd_data {
struct fd_buf *buf;
size_t nfds;
#ifdef HAVE_POLL
struct pollfd *poll_data;
#endif
};
static int realloc_polldata(struct fd_data *data)
{
@ -464,7 +449,7 @@ static int realloc_polldata(struct fd_data *data)
free(data->poll_data);
data->poll_data = malloc(data->nfds*sizeof(*data->poll_data));
if (!data->poll_data) {
logg("!add_fd: Memory allocation failed for poll_data\n");
logg("!realloc_polldata: Memory allocation failed for poll_data\n");
return -1;
}
#endif
@ -492,6 +477,8 @@ static void cleanup_fds(struct fd_data *data)
static int read_fd_data(struct fd_buf *buf)
{
if (!buf->buffer) /* listen-only socket */
return 0;
/* Read the pending packet, it may contain more than one command, but
* that is to the cmdparser to handle.
* It will handle 1st command, and then move leftover to beginning of buffer
@ -503,7 +490,7 @@ static int read_fd_data(struct fd_buf *buf)
buf->got_newdata=1;
}
int add_fd_to_poll(struct fd_data *data, int fd)
int fds_add(struct fd_data *data, int fd, int listen_only)
{
struct fd_buf *buf;
unsigned n = data->nfds + 1;
@ -519,10 +506,15 @@ int add_fd_to_poll(struct fd_data *data, int fd)
data->buf = buf;
data->nfds = n;
data->buf[n-1].fd = -1;
data->buf[n-1].bufsize = PATH_MAX+8;
if (!(data->buf[n-1].buffer = malloc(data->buf[n-1].bufsize))) {
logg("!add_fd: Memory allocation failed for command buffer\n");
return -1;
if (!listen_only) {
data->buf[n-1].bufsize = PATH_MAX+8;
if (!(data->buf[n-1].buffer = malloc(data->buf[n-1].bufsize))) {
logg("!add_fd: Memory allocation failed for command buffer\n");
return -1;
}
} else {
data->buf[n-1].bufsize = 0;
data->buf[n-1].buf = NULL;
}
data->buf[n-1].fd = fd;
data->buf[n-1].off = 0;
@ -536,8 +528,9 @@ int add_fd_to_poll(struct fd_data *data, int fd)
* timeout is specified in seconds, if check_signals is non-zero, then
* poll_recv_fds() will return upon receipt of a signal, even if no data
* is received on any of the sockets.
* Must be called with buf_mutex held.
*/
int poll_recv_fds(struct fd_data *data, int timeout, int check_signals)
int fds_poll_recv(struct fd_data *data, int timeout, int check_signals)
{
unsigned fdsok = data->nfds;
size_t i;
@ -570,7 +563,12 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals)
data->poll_data[i].revents = 0;
}
do {
retval = poll(data->poll_data, data->nfds, timeout);
int n = data->nfds;
pthread_mutex_unlock(&data->buf_mutex);
retval = poll(data->poll_data, n, timeout);
pthread_mutex_lock(&data->buf_mutex);
if (retval > 0) {
fdsok = 0;
for (i=0;i < data->nfds; i++) {
@ -612,7 +610,7 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals)
return -1;
}
maxfd = max(maxfd, fd);
maxfd = MAX(maxfd, fd);
}
do {
@ -625,7 +623,9 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals)
tv.tv_sec = timeout;
tv.tv_usec = 0;
pthread_mutex_unlock(&data->buf_mutex);
retval = select(maxfd+1, &rfds, NULL, NULL, timeout > 0 ? &tv : NULL);
pthread_mutex_lock(&data->buf_mutex);
if (retval > 0) {
fdsok = data->nfds;
for (i=0; i < data->nfds; i++) {
@ -657,4 +657,12 @@ int poll_recv_fds(struct fd_data *data, int timeout, int check_signals)
return retval;
}
void fds_free(struct fd_data *data)
{
if (data->buf)
free(data->buf);
#ifdef HAVE_POLL
if (data->poll_data)
free(data->poll_data);
#endif
}

@ -26,6 +26,23 @@
#include <stdlib.h>
#include "shared/optparser.h"
struct fd_buf {
unsigned char *buffer;
size_t bufsize;
size_t off;
int fd;
int got_newdata;
};
struct fd_data {
pthread_mutex_t buf_mutex; /* protects buf and nfds */
struct fd_buf *buf;
size_t nfds;
#ifdef HAVE_POLL
struct pollfd *poll_data;
#endif
};
int poll_fds(int *fds, int nfds, int timeout_sec, int check_signals);
int poll_fd(int fd, int timeout_sec, int check_signals);
int is_fd_connected(int fd);
@ -33,5 +50,8 @@ void virusaction(const char *filename, const char *virname, const struct optstru
int writen(int fd, void *buff, unsigned int count);
int readsock(int sockfd, char *buf, size_t size, unsigned char delim, int timeout_sec, int force_delim, int read_command);
int fds_add(struct fd_data *data, int fd, int listen_only);
int fds_poll_recv(struct fd_data *data, int timeout, int check_signals)
void fds_free(struct fd_data *data);
#endif

@ -304,6 +304,7 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
int idletimeout;
uint32_t val32;
uint64_t val64;
struct fd_data fds;
#ifdef CLAMUKO
pthread_t clamuko_pid;
@ -598,6 +599,14 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
idletimeout = optget(opts, "IdleTimeout")->numarg;
memset(&fds, 0, sizeof(fds));
for (i=0;i < nsockets;i++)
if (add_fd_to_poll(&fds, socketds[i], 1) == -1) {
logg("!cli_engine_set(CL_ENGINE_MAX_RECURSION) failed: %s\n", cl_strerror(ret));
cl_engine_free(engine);
return 1;
}
if((thr_pool=thrmgr_new(max_threads, idletimeout, scanner_thread)) == NULL) {
logg("!thrmgr_new failed\n");
exit(-1);
@ -605,32 +614,17 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
time(&start_time);
for(;;) {
#if !defined(C_WINDOWS) && !defined(C_BEOS)
struct stat st_buf;
#endif
int socketd = socketds[0];
int new_sd = 0;
if(nsockets > 1) {
int pollret = poll_fds(socketds, nsockets, -1, 1);
if(pollret > 0) {
socketd = socketds[pollret - 1];
} else {
new_sd = -1;
}
}
#if !defined(C_WINDOWS) && !defined(C_BEOS)
if(new_sd != -1 && fstat(socketd, &st_buf) == -1) {
logg("!fstat(): socket descriptor gone\n");
memmove(socketds, socketds + 1, sizeof(socketds[0]) * nsockets);
nsockets--;
if(!nsockets) {
logg("!Main socket gone: fatal\n");
break;
}
for(;;) {
int status = fds_poll_recv(&fds, -1, 1);
/* TODO: what about sockets that get rm-ed? */
if (!fds.nfds) {
/* no more sockets to poll, all gave an error */
logg("!Main socket gone: fatal\n");
break;
}
for (i=0;i < nsockets && i < fds.nfds && status > 0; i++) {
status = accept(
}
#endif
if (new_sd != -1)
new_sd = accept(socketd, NULL, NULL);
if((new_sd == -1) && (errno != EINTR)) {
@ -751,6 +745,7 @@ int acceptloop_th(int *socketds, int nsockets, struct cl_engine *engine, unsigne
}
}
fds_free(&fds);
/* Destroy the thread manager.
* This waits for all current tasks to end
*/

Loading…
Cancel
Save