mirror of https://github.com/Cisco-Talos/clamav
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
936 lines
28 KiB
936 lines
28 KiB
/*
|
|
* Copyright (C) 2013-2024 Cisco Systems, Inc. and/or its affiliates. All rights reserved.
|
|
* Copyright (C) 2007-2013 Sourcefire, Inc.
|
|
*
|
|
* Authors: Trog, Török Edvin
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
* MA 02110-1301, USA.
|
|
*/
|
|
|
|
#if HAVE_CONFIG_H
|
|
#include "clamav-config.h"
|
|
#endif
|
|
|
|
#include <stdio.h>
|
|
#include <pthread.h>
|
|
#include <time.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
|
|
// libclamav
|
|
#include "clamav.h"
|
|
#include "others.h"
|
|
#include "mpool.h"
|
|
|
|
// common
|
|
#include "output.h"
|
|
|
|
#include "thrmgr.h"
|
|
#include "clamd_others.h"
|
|
#include "server.h"
|
|
|
|
#ifdef HAVE_MALLINFO
|
|
#include <malloc.h>
|
|
#endif
|
|
|
|
/* BSD and HP-UX need a bigger stacksize than the system default */
|
|
#if defined(C_BSD) || defined(C_HPUX) || defined(C_AIX) || (defined(C_LINUX) && !defined(__GLIBC__))
|
|
#define C_BIGSTACK 1
|
|
#endif
|
|
|
|
static work_queue_t *work_queue_new(void)
|
|
{
|
|
work_queue_t *work_q;
|
|
|
|
work_q = (work_queue_t *)malloc(sizeof(work_queue_t));
|
|
if (!work_q) {
|
|
return NULL;
|
|
}
|
|
|
|
work_q->head = work_q->tail = NULL;
|
|
work_q->item_count = 0;
|
|
work_q->popped = 0;
|
|
return work_q;
|
|
}
|
|
|
|
static int work_queue_add(work_queue_t *work_q, void *data)
|
|
{
|
|
work_item_t *work_item;
|
|
|
|
if (!work_q) {
|
|
return FALSE;
|
|
}
|
|
work_item = (work_item_t *)malloc(sizeof(work_item_t));
|
|
if (!work_item) {
|
|
return FALSE;
|
|
}
|
|
|
|
work_item->next = NULL;
|
|
work_item->data = data;
|
|
gettimeofday(&(work_item->time_queued), NULL);
|
|
|
|
if (work_q->head == NULL) {
|
|
work_q->head = work_q->tail = work_item;
|
|
work_q->item_count = 1;
|
|
} else {
|
|
work_q->tail->next = work_item;
|
|
work_q->tail = work_item;
|
|
work_q->item_count++;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
static void *work_queue_pop(work_queue_t *work_q)
|
|
{
|
|
work_item_t *work_item;
|
|
void *data;
|
|
|
|
if (!work_q || !work_q->head) {
|
|
return NULL;
|
|
}
|
|
work_item = work_q->head;
|
|
data = work_item->data;
|
|
work_q->head = work_item->next;
|
|
if (work_q->head == NULL) {
|
|
work_q->tail = NULL;
|
|
}
|
|
free(work_item);
|
|
work_q->item_count--;
|
|
return data;
|
|
}
|
|
|
|
static struct threadpool_list {
|
|
threadpool_t *pool;
|
|
struct threadpool_list *nxt;
|
|
} *pools = NULL;
|
|
static pthread_mutex_t pools_lock = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
static void add_topools(threadpool_t *t)
|
|
{
|
|
struct threadpool_list *new = malloc(sizeof(*new));
|
|
if (!new) {
|
|
logg(LOGG_ERROR, "Unable to add threadpool to list\n");
|
|
return;
|
|
}
|
|
new->pool = t;
|
|
pthread_mutex_lock(&pools_lock);
|
|
new->nxt = pools;
|
|
pools = new;
|
|
pthread_mutex_unlock(&pools_lock);
|
|
}
|
|
|
|
static void remove_frompools(threadpool_t *t)
|
|
{
|
|
struct threadpool_list *l, *prev;
|
|
struct task_desc *desc;
|
|
pthread_mutex_lock(&pools_lock);
|
|
prev = NULL;
|
|
l = pools;
|
|
while (l && l->pool != t) {
|
|
prev = l;
|
|
l = l->nxt;
|
|
}
|
|
if (!l) {
|
|
pthread_mutex_unlock(&pools_lock);
|
|
return;
|
|
}
|
|
if (prev)
|
|
prev->nxt = l->nxt;
|
|
if (l == pools)
|
|
pools = l->nxt;
|
|
free(l);
|
|
desc = t->tasks;
|
|
while (desc) {
|
|
struct task_desc *q = desc;
|
|
desc = desc->nxt;
|
|
free(q);
|
|
}
|
|
t->tasks = NULL;
|
|
pthread_mutex_unlock(&pools_lock);
|
|
}
|
|
|
|
static void print_queue(int f, work_queue_t *queue, struct timeval *tv_now)
|
|
{
|
|
long umin = LONG_MAX, umax = 0, usum = 0;
|
|
unsigned invalids = 0, cnt = 0;
|
|
work_item_t *q;
|
|
|
|
if (!queue->head)
|
|
return;
|
|
for (q = queue->head; q; q = q->next) {
|
|
long delta;
|
|
delta = tv_now->tv_usec - q->time_queued.tv_usec;
|
|
delta += (tv_now->tv_sec - q->time_queued.tv_sec) * 1000000;
|
|
if (delta < 0) {
|
|
invalids++;
|
|
continue;
|
|
}
|
|
if (delta > umax)
|
|
umax = delta;
|
|
if (delta < umin)
|
|
umin = delta;
|
|
usum += delta;
|
|
++cnt;
|
|
}
|
|
mdprintf(f, " min_wait: %.6f max_wait: %.6f avg_wait: %.6f",
|
|
umin / 1e6, umax / 1e6, usum / (1e6 * cnt));
|
|
if (invalids)
|
|
mdprintf(f, " (INVALID timestamps: %u)", invalids);
|
|
if (cnt + invalids != (unsigned)queue->item_count)
|
|
mdprintf(f, " (ERROR: %u != %u)", cnt + invalids,
|
|
(unsigned)queue->item_count);
|
|
}
|
|
|
|
int thrmgr_printstats(int f, char term)
|
|
{
|
|
struct threadpool_list *l;
|
|
unsigned cnt, pool_cnt = 0;
|
|
size_t pool_used = 0, pool_total = 0, seen_cnt = 0, error_flag = 0;
|
|
float mem_heap = 0, mem_mmap = 0, mem_used = 0, mem_free = 0, mem_releasable = 0;
|
|
const struct cl_engine **seen = NULL;
|
|
int has_libc_memstats = 0;
|
|
|
|
pthread_mutex_lock(&pools_lock);
|
|
for (cnt = 0, l = pools; l; l = l->nxt) cnt++;
|
|
mdprintf(f, "POOLS: %u\n\n", cnt);
|
|
for (l = pools; l && !error_flag; l = l->nxt) {
|
|
threadpool_t *pool = l->pool;
|
|
const char *state;
|
|
struct timeval tv_now;
|
|
struct task_desc *task;
|
|
cnt = 0;
|
|
|
|
if (!pool) {
|
|
mdprintf(f, "NULL\n\n");
|
|
continue;
|
|
}
|
|
/* now we can access desc->, knowing that they won't get freed
|
|
* because the other tasks can't quit while pool_mutex is taken
|
|
*/
|
|
switch (pool->state) {
|
|
case POOL_INVALID:
|
|
state = "INVALID";
|
|
break;
|
|
case POOL_VALID:
|
|
state = "VALID";
|
|
break;
|
|
case POOL_EXIT:
|
|
state = "EXIT";
|
|
break;
|
|
default:
|
|
state = "??";
|
|
break;
|
|
}
|
|
mdprintf(f, "STATE: %s %s\n", state, l->nxt ? "" : "PRIMARY");
|
|
mdprintf(f, "THREADS: live %u idle %u max %u idle-timeout %u\n", pool->thr_alive, pool->thr_idle, pool->thr_max,
|
|
pool->idle_timeout);
|
|
/* TODO: show both queues */
|
|
mdprintf(f, "QUEUE: %u items", pool->single_queue->item_count + pool->bulk_queue->item_count);
|
|
gettimeofday(&tv_now, NULL);
|
|
print_queue(f, pool->bulk_queue, &tv_now);
|
|
print_queue(f, pool->single_queue, &tv_now);
|
|
mdprintf(f, "\n");
|
|
for (task = pool->tasks; task; task = task->nxt) {
|
|
double delta;
|
|
size_t used, total;
|
|
|
|
delta = tv_now.tv_usec - task->tv.tv_usec;
|
|
delta += (tv_now.tv_sec - task->tv.tv_sec) * 1000000.0;
|
|
mdprintf(f, "\t%s %f %s\n",
|
|
task->command ? task->command : "N/A",
|
|
delta / 1e6,
|
|
task->filename ? task->filename : "");
|
|
if (task->engine) {
|
|
/* we usually have at most 2 engines so a linear
|
|
* search is good enough */
|
|
size_t i;
|
|
for (i = 0; i < seen_cnt; i++) {
|
|
if (seen[i] == task->engine)
|
|
break;
|
|
}
|
|
/* we need to count the memusage from the same
|
|
* engine only once */
|
|
if (i == seen_cnt) {
|
|
const struct cl_engine **s;
|
|
/* new engine */
|
|
++seen_cnt;
|
|
s = realloc((void *)seen, seen_cnt * sizeof(*seen));
|
|
if (!s) {
|
|
error_flag = 1;
|
|
break;
|
|
}
|
|
seen = s;
|
|
seen[seen_cnt - 1] = task->engine;
|
|
|
|
if (MPOOL_GETSTATS(task->engine, &used, &total) != -1) {
|
|
pool_used += used;
|
|
pool_total += total;
|
|
pool_cnt++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
mdprintf(f, "\n");
|
|
}
|
|
free((void *)seen);
|
|
#ifdef HAVE_MALLINFO
|
|
{
|
|
struct mallinfo inf = mallinfo();
|
|
mem_heap = inf.arena / (1024 * 1024.0);
|
|
mem_mmap = inf.hblkhd / (1024 * 1024.0);
|
|
mem_used = (inf.usmblks + inf.uordblks) / (1024 * 1024.0);
|
|
mem_free = (inf.fsmblks + inf.fordblks) / (1024 * 1024.0);
|
|
mem_releasable = inf.keepcost / (1024 * 1024.0);
|
|
has_libc_memstats = 1;
|
|
}
|
|
#endif
|
|
if (error_flag) {
|
|
mdprintf(f, "ERROR: error encountered while formatting statistics\n");
|
|
} else {
|
|
if (has_libc_memstats)
|
|
mdprintf(f, "MEMSTATS: heap %.3fM mmap %.3fM used %.3fM free %.3fM releasable %.3fM pools %u pools_used %.3fM pools_total %.3fM\n",
|
|
mem_heap, mem_mmap, mem_used, mem_free, mem_releasable, pool_cnt,
|
|
pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0));
|
|
else
|
|
mdprintf(f, "MEMSTATS: heap N/A mmap N/A used N/A free N/A releasable N/A pools %u pools_used %.3fM pools_total %.3fM\n",
|
|
pool_cnt, pool_used / (1024 * 1024.0), pool_total / (1024 * 1024.0));
|
|
}
|
|
mdprintf(f, "END%c", term);
|
|
pthread_mutex_unlock(&pools_lock);
|
|
return 0;
|
|
}
|
|
|
|
void thrmgr_destroy(threadpool_t *threadpool)
|
|
{
|
|
if (!threadpool) {
|
|
return;
|
|
}
|
|
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex lock failed\n");
|
|
exit(-1);
|
|
}
|
|
if (threadpool->state != POOL_VALID) {
|
|
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex unlock failed\n");
|
|
exit(-1);
|
|
}
|
|
return;
|
|
}
|
|
threadpool->state = POOL_EXIT;
|
|
|
|
/* wait for threads to exit */
|
|
if (threadpool->thr_alive > 0) {
|
|
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
|
|
pthread_mutex_unlock(&threadpool->pool_mutex);
|
|
return;
|
|
}
|
|
}
|
|
while (threadpool->thr_alive > 0) {
|
|
if (pthread_cond_wait(&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
|
|
pthread_mutex_unlock(&threadpool->pool_mutex);
|
|
return;
|
|
}
|
|
}
|
|
remove_frompools(threadpool);
|
|
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex unlock failed\n");
|
|
exit(-1);
|
|
}
|
|
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
pthread_cond_destroy(&(threadpool->idle_cond));
|
|
pthread_cond_destroy(&(threadpool->queueable_single_cond));
|
|
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_attr_destroy(&(threadpool->pool_attr));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return;
|
|
}
|
|
|
|
void thrmgr_wait_for_threads(threadpool_t *threadpool)
|
|
{
|
|
if (!threadpool) {
|
|
return;
|
|
}
|
|
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex lock failed\n");
|
|
exit(-1);
|
|
}
|
|
if (threadpool->state != POOL_VALID) {
|
|
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex unlock failed\n");
|
|
exit(-1);
|
|
}
|
|
return;
|
|
}
|
|
|
|
/* wait for threads to exit */
|
|
if (threadpool->thr_alive > 0) {
|
|
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
|
|
pthread_mutex_unlock(&threadpool->pool_mutex);
|
|
return;
|
|
}
|
|
}
|
|
while (threadpool->thr_alive > 0) {
|
|
if (pthread_cond_wait(&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
|
|
pthread_mutex_unlock(&threadpool->pool_mutex);
|
|
return;
|
|
}
|
|
}
|
|
|
|
/* Ok threads all exited, we can release the lock */
|
|
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
|
|
logg(LOGG_ERROR, "Mutex unlock failed\n");
|
|
exit(-1);
|
|
}
|
|
return;
|
|
}
|
|
|
|
threadpool_t *thrmgr_new(int max_threads, int idle_timeout, int max_queue, void (*handler)(void *))
|
|
{
|
|
threadpool_t *threadpool;
|
|
#if defined(C_BIGSTACK)
|
|
size_t stacksize;
|
|
#endif
|
|
|
|
if (max_threads <= 0) {
|
|
return NULL;
|
|
}
|
|
|
|
threadpool = (threadpool_t *)malloc(sizeof(threadpool_t));
|
|
if (!threadpool) {
|
|
return NULL;
|
|
}
|
|
|
|
threadpool->single_queue = work_queue_new();
|
|
if (!threadpool->single_queue) {
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
threadpool->bulk_queue = work_queue_new();
|
|
if (!threadpool->bulk_queue) {
|
|
free(threadpool->single_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
threadpool->queue_max = max_queue;
|
|
|
|
threadpool->thr_max = max_threads;
|
|
threadpool->thr_alive = 0;
|
|
threadpool->thr_idle = 0;
|
|
threadpool->thr_multiscan = 0;
|
|
threadpool->idle_timeout = idle_timeout;
|
|
threadpool->handler = handler;
|
|
threadpool->tasks = NULL;
|
|
|
|
if (pthread_mutex_init(&(threadpool->pool_mutex), NULL)) {
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_cond_init(&(threadpool->queueable_single_cond), NULL) != 0) {
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_cond_init(&(threadpool->queueable_bulk_cond), NULL) != 0) {
|
|
pthread_cond_destroy(&(threadpool->queueable_single_cond));
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_cond_init(&(threadpool->idle_cond), NULL) != 0) {
|
|
pthread_cond_destroy(&(threadpool->queueable_single_cond));
|
|
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
|
|
pthread_cond_destroy(&(threadpool->queueable_single_cond));
|
|
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
|
|
pthread_cond_destroy(&(threadpool->idle_cond));
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
|
|
pthread_cond_destroy(&(threadpool->queueable_single_cond));
|
|
pthread_cond_destroy(&(threadpool->queueable_bulk_cond));
|
|
pthread_attr_destroy(&(threadpool->pool_attr));
|
|
pthread_cond_destroy(&(threadpool->idle_cond));
|
|
pthread_cond_destroy(&(threadpool->pool_cond));
|
|
pthread_mutex_destroy(&(threadpool->pool_mutex));
|
|
free(threadpool->single_queue);
|
|
free(threadpool->bulk_queue);
|
|
free(threadpool);
|
|
return NULL;
|
|
}
|
|
|
|
#if defined(C_BIGSTACK)
|
|
pthread_attr_getstacksize(&(threadpool->pool_attr), &stacksize);
|
|
stacksize = stacksize + 64 * 1024;
|
|
if (stacksize < 1048576) /* at least 1MB please */
|
|
#if defined(C_HPUX) && defined(USE_MPOOL)
|
|
/* Set aside one cli_pagesize() for the stack's pthread header,
|
|
* giving a 1M region to fit a 1M large-page */
|
|
if (cli_getpagesize() < 1048576)
|
|
stacksize = 1048576 - cli_getpagesize();
|
|
else
|
|
#endif
|
|
stacksize = 1048576;
|
|
logg(LOGG_INFO, "Set stacksize to %lu\n", (unsigned long int)stacksize);
|
|
pthread_attr_setstacksize(&(threadpool->pool_attr), stacksize);
|
|
#endif
|
|
threadpool->state = POOL_VALID;
|
|
|
|
add_topools(threadpool);
|
|
return threadpool;
|
|
}
|
|
|
|
static pthread_key_t stats_tls_key;
|
|
static pthread_once_t stats_tls_key_once = PTHREAD_ONCE_INIT;
|
|
|
|
static void stats_tls_key_alloc(void)
|
|
{
|
|
pthread_key_create(&stats_tls_key, NULL);
|
|
}
|
|
|
|
static const char *IDLE_TASK = "IDLE";
|
|
|
|
/* no mutex is needed, we are using thread local variable */
|
|
void thrmgr_setactivetask(const char *filename, const char *cmd)
|
|
{
|
|
struct task_desc *desc;
|
|
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
|
|
desc = pthread_getspecific(stats_tls_key);
|
|
if (!desc)
|
|
return;
|
|
desc->filename = filename;
|
|
if (cmd) {
|
|
if (cmd == IDLE_TASK && desc->command == cmd)
|
|
return;
|
|
desc->command = cmd;
|
|
gettimeofday(&desc->tv, NULL);
|
|
}
|
|
}
|
|
|
|
void thrmgr_setactiveengine(const struct cl_engine *engine)
|
|
{
|
|
struct task_desc *desc;
|
|
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
|
|
desc = pthread_getspecific(stats_tls_key);
|
|
if (!desc)
|
|
return;
|
|
desc->engine = engine;
|
|
}
|
|
|
|
/* thread pool mutex must be held on entry */
|
|
static void stats_init(threadpool_t *pool)
|
|
{
|
|
struct task_desc *desc = calloc(1, sizeof(*desc));
|
|
if (!desc)
|
|
return;
|
|
pthread_once(&stats_tls_key_once, stats_tls_key_alloc);
|
|
pthread_setspecific(stats_tls_key, desc);
|
|
if (!pool->tasks)
|
|
pool->tasks = desc;
|
|
else {
|
|
desc->nxt = pool->tasks;
|
|
pool->tasks->prv = desc;
|
|
pool->tasks = desc;
|
|
}
|
|
}
|
|
|
|
/* thread pool mutex must be held on entry */
|
|
static void stats_destroy(threadpool_t *pool)
|
|
{
|
|
struct task_desc *desc = pthread_getspecific(stats_tls_key);
|
|
if (!desc)
|
|
return;
|
|
pthread_mutex_lock(&pools_lock);
|
|
if (desc->prv)
|
|
desc->prv->nxt = desc->nxt;
|
|
if (desc->nxt)
|
|
desc->nxt->prv = desc->prv;
|
|
if (pool->tasks == desc)
|
|
pool->tasks = desc->nxt;
|
|
free(desc);
|
|
pthread_setspecific(stats_tls_key, NULL);
|
|
pthread_mutex_unlock(&pools_lock);
|
|
}
|
|
|
|
static inline int thrmgr_contended(threadpool_t *pool, int bulk)
|
|
{
|
|
/* don't allow bulk items to exceed 50% of queue, so that
|
|
* non-bulk items get a chance to be in the queue */
|
|
if (bulk && pool->bulk_queue->item_count >= pool->queue_max / 2)
|
|
return 1;
|
|
return pool->bulk_queue->item_count + pool->single_queue->item_count + pool->thr_alive - pool->thr_idle >= pool->queue_max;
|
|
}
|
|
|
|
/* when both queues have tasks, it will pick 4 items from the single queue,
|
|
* and 1 from the bulk */
|
|
#define SINGLE_BULK_RATIO 4
|
|
#define SINGLE_BULK_SUM (SINGLE_BULK_RATIO + 1)
|
|
|
|
/* must be called with pool_mutex held */
|
|
static void *thrmgr_pop(threadpool_t *pool)
|
|
{
|
|
void *task;
|
|
work_queue_t *first, *second;
|
|
int ratio;
|
|
|
|
if (pool->single_queue->popped < SINGLE_BULK_RATIO) {
|
|
first = pool->single_queue;
|
|
second = pool->bulk_queue;
|
|
ratio = SINGLE_BULK_RATIO;
|
|
} else {
|
|
second = pool->single_queue;
|
|
first = pool->bulk_queue;
|
|
ratio = SINGLE_BULK_SUM - SINGLE_BULK_RATIO;
|
|
}
|
|
|
|
task = work_queue_pop(first);
|
|
if (task) {
|
|
if (++first->popped == ratio)
|
|
second->popped = 0;
|
|
} else {
|
|
task = work_queue_pop(second);
|
|
if (task) {
|
|
if (++second->popped == ratio)
|
|
first->popped = 0;
|
|
}
|
|
}
|
|
|
|
if (!thrmgr_contended(pool, 0)) {
|
|
logg(LOGG_DEBUG_NV, "THRMGR: queue (single) crossed low threshold -> signaling\n");
|
|
pthread_cond_signal(&pool->queueable_single_cond);
|
|
}
|
|
|
|
if (!thrmgr_contended(pool, 1)) {
|
|
logg(LOGG_DEBUG_NV, "THRMGR: queue (bulk) crossed low threshold -> signaling\n");
|
|
pthread_cond_signal(&pool->queueable_bulk_cond);
|
|
}
|
|
|
|
return task;
|
|
}
|
|
|
|
static void *thrmgr_worker(void *arg)
|
|
{
|
|
threadpool_t *threadpool = (threadpool_t *)arg;
|
|
void *job_data;
|
|
int retval, must_exit = FALSE, stats_inited = FALSE;
|
|
struct timespec timeout;
|
|
|
|
/* loop looking for work */
|
|
for (;;) {
|
|
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
|
|
logg(LOGG_ERROR, "Fatal: mutex lock failed\n");
|
|
exit(-2);
|
|
}
|
|
if (!stats_inited) {
|
|
stats_init(threadpool);
|
|
stats_inited = TRUE;
|
|
}
|
|
thrmgr_setactiveengine(NULL);
|
|
thrmgr_setactivetask(NULL, IDLE_TASK);
|
|
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
|
|
timeout.tv_nsec = 0;
|
|
threadpool->thr_idle++;
|
|
while (((job_data = thrmgr_pop(threadpool)) == NULL) && (threadpool->state != POOL_EXIT)) {
|
|
/* Sleep, awaiting wakeup */
|
|
pthread_cond_signal(&threadpool->idle_cond);
|
|
retval = pthread_cond_timedwait(&(threadpool->pool_cond),
|
|
&(threadpool->pool_mutex), &timeout);
|
|
if (retval == ETIMEDOUT) {
|
|
must_exit = TRUE;
|
|
break;
|
|
}
|
|
}
|
|
threadpool->thr_idle--;
|
|
if (threadpool->state == POOL_EXIT) {
|
|
must_exit = TRUE;
|
|
}
|
|
|
|
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
|
|
logg(LOGG_ERROR, "Fatal: mutex unlock failed\n");
|
|
exit(-2);
|
|
}
|
|
if (job_data) {
|
|
threadpool->handler(job_data);
|
|
} else if (must_exit) {
|
|
break;
|
|
}
|
|
}
|
|
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
|
|
/* Fatal error */
|
|
logg(LOGG_ERROR, "Fatal: mutex lock failed\n");
|
|
exit(-2);
|
|
}
|
|
threadpool->thr_alive--;
|
|
if (threadpool->thr_alive == 0) {
|
|
/* signal that all threads are finished */
|
|
pthread_cond_broadcast(&threadpool->pool_cond);
|
|
}
|
|
stats_destroy(threadpool);
|
|
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
|
|
/* Fatal error */
|
|
logg(LOGG_ERROR, "Fatal: mutex unlock failed\n");
|
|
exit(-2);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int thrmgr_dispatch_internal(threadpool_t *threadpool, void *user_data, int bulk)
|
|
{
|
|
int ret = TRUE;
|
|
pthread_t thr_id;
|
|
|
|
if (!threadpool) {
|
|
return FALSE;
|
|
}
|
|
|
|
/* Lock the threadpool */
|
|
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
|
|
logg(LOGG_ERROR, "Mutex lock failed\n");
|
|
return FALSE;
|
|
}
|
|
|
|
do {
|
|
work_queue_t *queue;
|
|
pthread_cond_t *queueable_cond;
|
|
int items;
|
|
|
|
if (threadpool->state != POOL_VALID) {
|
|
ret = FALSE;
|
|
break;
|
|
}
|
|
|
|
if (bulk) {
|
|
queue = threadpool->bulk_queue;
|
|
queueable_cond = &threadpool->queueable_bulk_cond;
|
|
} else {
|
|
queue = threadpool->single_queue;
|
|
queueable_cond = &threadpool->queueable_single_cond;
|
|
}
|
|
|
|
while (thrmgr_contended(threadpool, bulk)) {
|
|
logg(LOGG_DEBUG_NV, "THRMGR: contended, sleeping\n");
|
|
pthread_cond_wait(queueable_cond, &threadpool->pool_mutex);
|
|
logg(LOGG_DEBUG_NV, "THRMGR: contended, woken\n");
|
|
}
|
|
|
|
if (!work_queue_add(queue, user_data)) {
|
|
ret = FALSE;
|
|
break;
|
|
}
|
|
|
|
items = threadpool->single_queue->item_count + threadpool->bulk_queue->item_count;
|
|
if ((threadpool->thr_idle < items) &&
|
|
(threadpool->thr_alive < threadpool->thr_max)) {
|
|
/* Start a new thread */
|
|
if (pthread_create(&thr_id, &(threadpool->pool_attr),
|
|
thrmgr_worker, threadpool) != 0) {
|
|
logg(LOGG_ERROR, "pthread_create failed\n");
|
|
} else {
|
|
threadpool->thr_alive++;
|
|
}
|
|
}
|
|
pthread_cond_signal(&(threadpool->pool_cond));
|
|
|
|
} while (0);
|
|
|
|
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
|
|
logg(LOGG_ERROR, "Mutex unlock failed\n");
|
|
return FALSE;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
|
|
{
|
|
return thrmgr_dispatch_internal(threadpool, user_data, 0);
|
|
}
|
|
|
|
int thrmgr_group_dispatch(threadpool_t *threadpool, jobgroup_t *group, void *user_data, int bulk)
|
|
{
|
|
int ret;
|
|
if (group) {
|
|
pthread_mutex_lock(&group->mutex);
|
|
group->jobs++;
|
|
logg(LOGG_DEBUG_NV, "THRMGR: active jobs for %p: %d\n", group, group->jobs);
|
|
pthread_mutex_unlock(&group->mutex);
|
|
}
|
|
if (!(ret = thrmgr_dispatch_internal(threadpool, user_data, bulk)) && group) {
|
|
pthread_mutex_lock(&group->mutex);
|
|
group->jobs--;
|
|
logg(LOGG_DEBUG_NV, "THRMGR: active jobs for %p: %d\n", group, group->jobs);
|
|
pthread_mutex_unlock(&group->mutex);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
/* returns
|
|
* 0 - this was not the last thread in the group
|
|
* 1 - this was last thread in group, group freed
|
|
*/
|
|
int thrmgr_group_finished(jobgroup_t *group, enum thrmgr_exit exitc)
|
|
{
|
|
int ret = 0;
|
|
if (!group) {
|
|
/* there is no group, we are obviously the last one */
|
|
return 1;
|
|
}
|
|
pthread_mutex_lock(&group->mutex);
|
|
logg(LOGG_DEBUG_NV, "THRMGR: group_finished: %p, %d\n", group, group->jobs);
|
|
group->exit_total++;
|
|
switch (exitc) {
|
|
case EXIT_OK:
|
|
group->exit_ok++;
|
|
break;
|
|
case EXIT_ERROR:
|
|
group->exit_error++;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
if (group->jobs) {
|
|
if (!--group->jobs) {
|
|
ret = 1;
|
|
} else
|
|
logg(LOGG_DEBUG_NV, "THRMGR: active jobs for %p: %d\n", group, group->jobs);
|
|
if (group->jobs == 1)
|
|
pthread_cond_signal(&group->only);
|
|
}
|
|
pthread_mutex_unlock(&group->mutex);
|
|
if (ret) {
|
|
logg(LOGG_DEBUG_NV, "THRMGR: group_finished: freeing %p\n", group);
|
|
pthread_mutex_destroy(&group->mutex);
|
|
pthread_cond_destroy(&group->only);
|
|
free(group);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void thrmgr_group_waitforall(jobgroup_t *group, unsigned *ok, unsigned *error, unsigned *total)
|
|
{
|
|
int needexit = 0, needfree = 0;
|
|
struct timespec timeout;
|
|
pthread_mutex_lock(&group->mutex);
|
|
while (group->jobs > 1) {
|
|
pthread_mutex_lock(&exit_mutex);
|
|
needexit = progexit;
|
|
pthread_mutex_unlock(&exit_mutex);
|
|
if (needexit)
|
|
break;
|
|
/* wake to check progexit */
|
|
timeout.tv_sec = time(NULL) + 5;
|
|
timeout.tv_nsec = 0;
|
|
pthread_cond_timedwait(&group->only, &group->mutex, &timeout);
|
|
}
|
|
*ok = group->exit_ok;
|
|
*error = group->exit_error + needexit;
|
|
*total = group->exit_total;
|
|
if (!--group->jobs)
|
|
needfree = 1;
|
|
else
|
|
logg(LOGG_DEBUG_NV, "THRMGR: active jobs for %p: %d\n", group, group->jobs);
|
|
pthread_mutex_unlock(&group->mutex);
|
|
if (needfree) {
|
|
logg(LOGG_DEBUG_NV, "THRMGR: group finished freeing %p\n", group);
|
|
free(group);
|
|
}
|
|
}
|
|
|
|
jobgroup_t *thrmgr_group_new(void)
|
|
{
|
|
jobgroup_t *group;
|
|
|
|
group = malloc(sizeof(*group));
|
|
if (!group)
|
|
return NULL;
|
|
group->jobs = 1;
|
|
group->exit_ok = group->exit_error = group->exit_total = group->force_exit = 0;
|
|
if (pthread_mutex_init(&group->mutex, NULL)) {
|
|
logg(LOGG_WARNING, "Failed to initialize group mutex");
|
|
free(group);
|
|
return NULL;
|
|
}
|
|
if (pthread_cond_init(&group->only, NULL)) {
|
|
logg(LOGG_WARNING, "Failed to initialize group cond");
|
|
pthread_mutex_destroy(&group->mutex);
|
|
free(group);
|
|
return NULL;
|
|
}
|
|
logg(LOGG_DEBUG_NV, "THRMGR: new group: %p\n", group);
|
|
return group;
|
|
}
|
|
|
|
int thrmgr_group_need_terminate(jobgroup_t *group)
|
|
{
|
|
int ret;
|
|
if (group) {
|
|
pthread_mutex_lock(&group->mutex);
|
|
ret = group->force_exit;
|
|
pthread_mutex_unlock(&group->mutex);
|
|
} else
|
|
ret = 0;
|
|
pthread_mutex_lock(&exit_mutex);
|
|
ret |= progexit;
|
|
pthread_mutex_unlock(&exit_mutex);
|
|
return ret;
|
|
}
|
|
|
|
void thrmgr_group_terminate(jobgroup_t *group)
|
|
{
|
|
if (group) {
|
|
/* we may not be the last active job, now
|
|
* the last active job will free resources */
|
|
pthread_mutex_lock(&group->mutex);
|
|
group->force_exit = 1;
|
|
pthread_mutex_unlock(&group->mutex);
|
|
}
|
|
}
|
|
|