thrmgr: new clean reimplementation

git-svn: trunk@338
remotes/push_mirror/metadata
Trog 22 years ago
parent cdff10423d
commit 52e8d3c608
  1. 4
      clamav-devel/ChangeLog
  2. 14
      clamav-devel/clamd/server-th.c
  3. 545
      clamav-devel/clamd/thrmgr.c
  4. 124
      clamav-devel/clamd/thrmgr.h

@ -1,3 +1,7 @@
Wed Feb 25 11:07:53 GMT 2004 (trog)
-----------------------------------
* clamd thrmgr: new clean reimplementation
Wed Feb 25 08:57:35 GMT 2004 (trog)
-----------------------------------
* libclamav/vba_extract.c: add VBA signature for Office 2003

@ -176,7 +176,7 @@ static struct cl_node *reload_db(struct cl_node *root, const struct cfgstruct *c
int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *copt)
{
int new_sd, max_threads, options=0;
thrmgr_t thrmgr;
threadpool_t *thr_pool;
struct sigaction sigact;
mode_t old_umask;
struct cl_limits limits;
@ -371,8 +371,8 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
pthread_mutex_init(&exit_mutex, NULL);
pthread_mutex_init(&reload_mutex, NULL);
if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
logg("thrmgr_init failed");
if((thr_pool=thrmgr_new(max_threads, 30, scanner_thread)) == NULL) {
logg("thrmgr_new failed");
exit(-1);
}
@ -400,7 +400,7 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
client_conn->copt = copt;
client_conn->root = root;
client_conn->limits = &limits;
thrmgr_add(&thrmgr, client_conn);
thrmgr_dispatch(thr_pool, client_conn);
}
pthread_mutex_lock(&exit_mutex);
@ -429,10 +429,10 @@ int acceptloop_th(int socketd, struct cl_node *root, const struct cfgstruct *cop
/* Destroy the thread manager.
* This waits for all current tasks to end
*/
thrmgr_destroy(&thrmgr);
thrmgr_destroy(thr_pool);
root = reload_db(root, copt, FALSE);
if(thrmgr_init(&thrmgr, max_threads, 1, scanner_thread) != 0) {
logg("!thrmgr_init failed");
if((thr_pool=thrmgr_new(max_threads, 30, scanner_thread)) == NULL) {
logg("!thrmgr_new failed");
pthread_mutex_unlock(&reload_mutex);
exit(-1);
}

@ -1,9 +1,6 @@
/*
* Copyright (C) 2004 Trog <trog@clamav.net>
*
* The code is based on the book "Programming with POSIX threads" by Dave
* Butenhof
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@ -18,356 +15,242 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*
* thrmgr.c
*
* This file implements the interfaces for a "work queue"
* manager. A "manager object" is created with several
* parameters, including the required size of a work queue
* entry, the maximum desired degree of parallelism (number of
* threads to service the queue), and the address of an
* execution engine routine.
*
* The application requests a work queue entry from the manager,
* fills in the application-specific fields, and returns it to
* the queue manager for processing. The manager will create a
* new thread to service the queue if all current threads are
* busy and the maximum level of parallelism has not yet been
* reached.
*
* The manager will dequeue items and present them to the
* processing engine until the queue is empty; at that point,
* processing threads will begin to shut down. (They will be
* restarted when work appears.)
*/
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <errno.h>
#include "thrmgr.h"
#include "others.h"
/*
* Thread start routine to serve the work queue.
*/
static void *thrmgr_server (void *arg)
{
thrmgr_t *thrmgr = (thrmgr_t *)arg;
work_element_t *we;
int status;
/*
* We don't need to validate the thrmgr_t here... we don't
* create server threads until requests are queued (the
* queue has been initialized by then!) and we wait for all
* server threads to terminate before destroying a work
* queue.
*/
/* log_message ("A worker is starting"); */
status = pthread_mutex_lock (&thrmgr->mutex);
if (status != 0) {
//log_message ("A worker is dying");
return(NULL);
}
while (1) {
thrmgr->idle++;
#include "others.h"
/* log_message ("Worker waiting for work - idle:%d", thrmgr->idle); */
while ( (thrmgr->first == NULL) && !thrmgr->quit) {
#ifndef BROKEN_COND_SIGNAL
status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
#else
status = pthread_mutex_unlock (&thrmgr->mutex);
status = sem_wait(&thrmgr->semaphore);
pthread_mutex_lock (&thrmgr->mutex);
#endif
if (status != 0) {
/*
* This shouldn't happen, so the work queue
* package should fail. Because the work queue
* API is asynchronous, that would add
* complication. Because the chances of failure
* are slim, I choose to avoid that
* complication. The server thread will return,
* and allow another server thread to pick up
* the work later. Note that, if this was the
* only server thread, the queue won't be
* serviced until a new work item is
* queued. That could be fixed by creating a new
* server here.
*/
//log_message ("Worker wait failed, %d (%s)",
//status, strerror (status));
thrmgr->counter--;
thrmgr->idle--;
pthread_mutex_unlock (&thrmgr->mutex);
return(NULL);
}
}
we = thrmgr->first;
if (we != NULL) {
thrmgr->first = we->next;
if (thrmgr->last == we) {
thrmgr->last = NULL;
}
thrmgr->idle--;
status = pthread_mutex_unlock (&thrmgr->mutex);
if (status != 0) {
//log_message ("A worker is dying");
return(NULL);
}
/* log_message ("Worker calling handler"); */
thrmgr->handler (we->data);
free (we);
status = pthread_mutex_lock (&thrmgr->mutex);
if (status != 0) {
//log_message ("A worker is dying");
return(NULL);
}
}
/*
* If there are no more work requests, and the servers
* have been asked to quit, then shut down.
*/
if ( (thrmgr->first == NULL) &&thrmgr->quit) {
//log_message ("Worker shutting down");
thrmgr->counter--;
/*
* NOTE: Just to prove that every rule has an
* exception, I'm using the "cond" condition for two
* separate predicates here. That's OK, since the
* case used here applies only once during the life
* of a work queue -- during rundown. The overhead
* is minimal and it's not worth creating a separate
* condition variable that would be waited and
* signaled exactly once!
*/
#ifndef BROKEN_COND_SIGNAL
if (thrmgr->counter == 0) {
pthread_cond_broadcast (&thrmgr->cond);
}
#endif
pthread_mutex_unlock (&thrmgr->mutex);
//log_message ("A worker is dying");
return(NULL);
}
}
#define FALSE (0)
#define TRUE (1)
pthread_mutex_unlock (&thrmgr->mutex);
//log_message ("Worker exiting");
return(NULL);
work_queue_t *work_queue_new()
{
work_queue_t *work_q;
work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));
work_q->head = work_q->tail = NULL;
work_q->item_count = 0;
return work_q;
}
/*
* Initialize a thread manager.
*/
int thrmgr_init( thrmgr_t *thrmgr, /* thread manager */
int max_threads, /* maximum threads */
int alloc_unit, /* thread creation unit */
void (*handler)(void *arg)) /* request handler */
void work_queue_add(work_queue_t *work_q, void *data)
{
int status;
status = pthread_attr_init (&thrmgr->attr);
if (status != 0)
return(status);
status = pthread_attr_setdetachstate (&thrmgr->attr,
PTHREAD_CREATE_DETACHED);
if (status != 0) {
pthread_attr_destroy (&thrmgr->attr);
return(status);
}
status = pthread_mutex_init (&thrmgr->mutex, NULL);
if (status != 0) {
pthread_attr_destroy (&thrmgr->attr);
return(status);
}
#ifndef BROKEN_COND_SIGNAL
status = pthread_cond_init (&thrmgr->cond, NULL);
#else
status = sem_init(&thrmgr->semaphore, 0, 0);
#endif
if (status != 0) {
pthread_mutex_destroy (&thrmgr->mutex);
pthread_attr_destroy (&thrmgr->attr);
return(status);
}
thrmgr->quit = 0; /* not time to quit */
thrmgr->first = thrmgr->last = NULL; /* no queue entries */
thrmgr->parallelism = max_threads; /* max servers */
thrmgr->alloc_unit = alloc_unit; /* thread creation unit */
thrmgr->counter = 0; /* no server threads yet */
thrmgr->idle = 0; /* no idle servers */
thrmgr->handler = handler;
thrmgr->valid = THRMGR_VALID;
return(0);
work_item_t *work_item;
if (!work_q) {
return;
}
work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
work_item->next = NULL;
work_item->data = data;
gettimeofday(&(work_item->time_queued), NULL);
if (work_q->head == NULL) {
work_q->head = work_q->tail = work_item;
work_q->item_count = 1;
} else {
work_q->tail->next = work_item;
work_q->tail = work_item;
work_q->item_count++;
}
return;
}
/*
* Destroy a thread manager
*/
int thrmgr_destroy (thrmgr_t *thrmgr)
void *work_queue_pop(work_queue_t *work_q)
{
int status, status1, status2;
if (thrmgr->valid != THRMGR_VALID) {
return EINVAL;
}
status = pthread_mutex_lock (&thrmgr->mutex);
if (status != 0) {
return(status);
}
thrmgr->valid = 0; /* prevent any other operations */
/*
* Check whether any threads are active, and run them down:
*
* 1. set the quit flag
* 2. broadcast to wake any servers that may be asleep
* 3. wait for all threads to quit (counter goes to 0)
*
*/
if (thrmgr->counter > 0) {
thrmgr->quit = 1;
/* if any threads are idling, wake them. */
if (thrmgr->idle > 0) {
#ifndef BROKEN_COND_SIGNAL
status = pthread_cond_broadcast (&thrmgr->cond);
if (status != 0) {
pthread_mutex_unlock (&thrmgr->mutex);
return(status);
}
#endif
}
/*
* Just to prove that every rule has an exception, I'm
* using the "cv" condition for two separate predicates
* here. That's OK, since the case used here applies
* only once during the life of a work queue -- during
* rundown. The overhead is minimal and it's not worth
* creating a separate condition variable that would be
* waited and signalled exactly once!
*/
while (thrmgr->counter > 0) {
#ifndef BROKEN_COND_SIGNAL
status = pthread_cond_wait (&thrmgr->cond, &thrmgr->mutex);
if (status != 0) {
pthread_mutex_unlock (&thrmgr->mutex);
return(status);
}
#endif
}
}
status = pthread_mutex_unlock (&thrmgr->mutex);
if (status != 0) {
return(status);
}
status = pthread_mutex_destroy (&thrmgr->mutex);
#ifndef BROKEN_COND_SIGNAL
status1 = pthread_cond_destroy (&thrmgr->cond);
#else
status1 = sem_destroy(&thrmgr->semaphore);
#endif
status2 = pthread_attr_destroy (&thrmgr->attr);
return (status ? status : (status1 ? status1 : status2));
work_item_t *work_item;
void *data;
if (!work_q || !work_q->head) {
return NULL;
}
work_item = work_q->head;
data = work_item->data;
work_q->head = work_item->next;
if (work_q->head == NULL) {
work_q->tail = NULL;
}
free(work_item);
return data;
}
/*
* Add an item to a work queue.
*/
int thrmgr_add( thrmgr_t *thrmgr,
void *element )
void thrmgr_destroy(threadpool_t *threadpool)
{
work_element_t *item;
pthread_t id;
int status;
int count;
if (thrmgr->valid != THRMGR_VALID) {
return(EINVAL);
}
/*
* Create and initialize a request structure.
*/
item = mmalloc( sizeof (work_element_t) );
item->data = element;
item->next = NULL;
status = pthread_mutex_lock (&thrmgr->mutex);
if (status != 0) {
free (item);
return(status);
}
/*
* Add the request to the end of the queue, updating the
* first and last pointers.
*/
if (thrmgr->first == NULL) {
thrmgr->first = item;
} else {
thrmgr->last->next = item;
}
thrmgr->last = item;
/*
* if any threads are idling, wake one.
*/
/* printf("Idle threads: %d\n", thrmgr->idle); */
if (thrmgr->idle > 0) {
#ifndef BROKEN_COND_SIGNAL
status = pthread_cond_signal (&thrmgr->cond);
#else
status = sem_post(&thrmgr->semaphore);
#endif
if (status != 0) {
pthread_mutex_unlock (&thrmgr->mutex);
return(status);
}
} else if (thrmgr->counter < thrmgr->parallelism) {
/*
* If there were no idling threads, and we're allowed to
* create a new thread, do so.
*/
for ( count=0 ; count < thrmgr->alloc_unit ; count++ ) {
/* log_message ("Creating new worker"); */
status = pthread_create (&id, &thrmgr->attr, thrmgr_server, (void*)thrmgr);
if (status != 0) {
pthread_mutex_unlock (&thrmgr->mutex);
return(status);
}
thrmgr->counter++;
}
}
pthread_mutex_unlock (&thrmgr->mutex);
return(0);
if (!threadpool || (threadpool->state != POOL_VALID)) {
return;
}
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
logg("!Mutex lock failed");
exit(-1);
}
threadpool->state = POOL_EXIT;
/* wait for threads to exit */
if (threadpool->thr_alive > 0) {
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
while (threadpool->thr_alive > 0) {
if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
logg("!Mutex unlock failed");
exit(-1);
}
pthread_mutex_destroy(&(threadpool->pool_mutex));
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_attr_destroy(&(threadpool->pool_attr));
free(threadpool);
return;
}
int thrmgr_stat( thrmgr_t *thrmgr,
int *threads,
int *idle )
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
{
int status;
threadpool_t *threadpool;
if (max_threads <= 0) {
return NULL;
}
threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));
status = pthread_mutex_lock (&thrmgr->mutex);
if (status != 0) {
return(-1);
}
threadpool->queue = work_queue_new();
if (!threadpool->queue) {
free(threadpool);
return NULL;
}
threadpool->thr_max = max_threads;
threadpool->thr_alive = 0;
threadpool->thr_idle = 0;
threadpool->idle_timeout = idle_timeout;
threadpool->handler = handler;
pthread_mutex_init(&(threadpool->pool_mutex), NULL);
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
free(threadpool);
return NULL;
}
if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
free(threadpool);
return NULL;
}
if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
free(threadpool);
return NULL;
}
threadpool->state = POOL_VALID;
*threads = thrmgr->counter;
*idle = thrmgr->idle;
return threadpool;
}
pthread_mutex_unlock (&thrmgr->mutex);
return(0);
void *thrmgr_worker(void *arg)
{
threadpool_t *threadpool = (threadpool_t *) arg;
void *job_data;
int retval, must_exit = FALSE;
struct timespec timeout;
/* loop looking for work */
for (;;) {
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex lock failed");
exit(-2);
}
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
timeout.tv_nsec = 0;
threadpool->thr_idle++;
while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
&& (threadpool->state != POOL_EXIT)) {
/* Sleep, awaiting wakeup */
retval = pthread_cond_timedwait(&(threadpool->pool_cond),
&(threadpool->pool_mutex), &timeout);
if (retval == ETIMEDOUT) {
must_exit = TRUE;
break;
}
}
threadpool->thr_idle--;
if (threadpool->state == POOL_EXIT) {
must_exit = TRUE;
}
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex unlock failed");
exit(-2);
}
if (job_data) {
threadpool->handler(job_data);
} else if (must_exit) {
break;
}
}
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex lock failed");
exit(-2);
}
threadpool->thr_alive--;
if (threadpool->thr_alive == 0) {
/* signal that all threads are finished */
pthread_cond_broadcast(&threadpool->pool_cond);
}
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex unlock failed");
exit(-2);
}
return NULL;
}
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
{
pthread_t thr_id;
if (!threadpool) {
return FALSE;
}
/* Lock the threadpool */
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
logg("!Mutex lock failed");
return FALSE;
}
if (threadpool->state != POOL_VALID) {
return FALSE;
}
work_queue_add(threadpool->queue, user_data);
if ((threadpool->thr_idle == 0) &&
(threadpool->thr_alive < threadpool->thr_max)) {
/* Start a new thread */
if (pthread_create(&thr_id, &(threadpool->pool_attr),
thrmgr_worker, threadpool) != 0) {
logg("!pthread_create failed");
} else {
threadpool->thr_alive++;
}
}
pthread_cond_signal(&(threadpool->pool_cond));
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
logg("!Mutex unlock failed");
return FALSE;
}
return TRUE;
}

@ -1,9 +1,6 @@
/*
* Copyright (C) 2004 Trog <trog@clamav.net>
*
* The code is based on the book "Programming with POSIX threads" by Dave
* Butenhof
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@ -18,92 +15,49 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*
* workq.h
*
* This header file defines the interfaces for a "work queue"
* manager. A "manager object" is created with several
* parameters, including the required size of a work queue
* entry, the maximum desired degree of parallelism (number of
* threads to service the queue), and the address of an
* execution engine routine.
*
* The application requests a work queue entry from the manager,
* fills in the application-specific fields, and returns it to
* the queue manager for processing. The manager will create a
* new thread to service the queue if all current threads are
* busy and the maximum level of parallelism has not yet been
* reached.
*
* The manager will dequeue items and present them to the
* processing engine until the queue is empty; at that point,
* processing threads will begin to shut down. (They will be
* restarted when work appears.)
*/
#ifndef __THRMGR_H__
#define __THRMGR_H__
#ifdef DEBUG
# define DPRINTF(arg) printf arg
#else
# define DPRINTF(arg)
#endif
#include <pthread.h>
// #include "config.h"
#ifdef BROKEN_COND_SIGNAL
#include <semaphore.h>
#endif
/*
* Structure to keep track of work requests.
*/
typedef struct work_element_tag {
struct work_element_tag *next;
void *data;
} work_element_t;
#include <sys/time.h>
typedef struct work_item_tag {
struct work_item_tag *next;
void *data;
struct timeval time_queued;
} work_item_t;
typedef struct work_queue_tag {
work_item_t *head;
work_item_t *tail;
int item_count;
} work_queue_t;
typedef enum {
POOL_INVALID,
POOL_VALID,
POOL_EXIT,
} pool_state_t;
typedef struct threadpool_tag {
pthread_mutex_t pool_mutex;
pthread_cond_t pool_cond;
pthread_attr_t pool_attr;
pool_state_t state;
int thr_max;
int thr_alive;
int thr_idle;
int idle_timeout;
void (*handler)(void *);
work_queue_t *queue;
} threadpool_t;
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
void thrmgr_destroy(threadpool_t *threadpool);
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
/*
* Structure describing a work queue.
*/
typedef struct thrmgr_tag {
pthread_mutex_t mutex;
#ifndef BROKEN_COND_SIGNAL
pthread_cond_t cond; /* wait for work */
#else
sem_t semaphore;
#endif
pthread_attr_t attr; /* create detached threads */
work_element_t *first, *last; /* work queue */
int valid; /* set when valid */
int quit; /* set when workq should quit */
int parallelism; /* number of threads required */
int alloc_unit; /* unit of thread creation */
int counter; /* current number of threads */
int idle; /* number of idle threads */
void (*handler)(void *arg); /* request handler */
} thrmgr_t;
#define THRMGR_VALID 0xdeadfeed
/*
* Define work queue functions
*/
extern int thrmgr_init( thrmgr_t *thrmgr, /* thread manager */
int max_threads, /* maximum threads */
int alloc_unit, /* thread creation unit */
void (*handler)(void *) ); /* request handler */
extern int thrmgr_destroy( thrmgr_t *thrmgr );
extern int thrmgr_add( thrmgr_t *thrmgr,
void *data );
int thrmgr_stat( thrmgr_t *thrmgr,
int *threads,
int *idle );
#endif /* __THRMGR_H__ */

Loading…
Cancel
Save