mirror of https://github.com/Cisco-Talos/clamav
clamonacc - add consumer queue; add thread pool library; add thread pool support in consumer queue; flesh out consumer queue code; refactor scan functions into thread pool worker functions; refactor scan functions to work off slimmed down params and event metadata instead of a single, giant context; sundry fixups
parent
0d78af13f1
commit
b365aa5884
@ -0,0 +1,21 @@ |
||||
The MIT License (MIT) |
||||
|
||||
Copyright (c) 2016 Johan Hanssen Seferidis |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in all |
||||
copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
||||
SOFTWARE. |
@ -0,0 +1,551 @@ |
||||
/* ********************************
|
||||
* Author: Johan Hanssen Seferidis |
||||
* License: MIT |
||||
* Description: Library providing a threading pool where you can add |
||||
* work. For usage, check the thpool.h file or README.md |
||||
* |
||||
*//** @file thpool.h *//*
|
||||
* |
||||
********************************/ |
||||
|
||||
#define _POSIX_C_SOURCE 200809L |
||||
#include <unistd.h> |
||||
#include <signal.h> |
||||
#include <stdio.h> |
||||
#include <stdlib.h> |
||||
#include <pthread.h> |
||||
#include <errno.h> |
||||
#include <time.h> |
||||
#if defined(__linux__) |
||||
#include <sys/prctl.h> |
||||
#endif |
||||
|
||||
#include "thpool.h" |
||||
|
||||
#ifdef THPOOL_DEBUG |
||||
#define THPOOL_DEBUG 1 |
||||
#else |
||||
#define THPOOL_DEBUG 0 |
||||
#endif |
||||
|
||||
#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG) |
||||
#define err(str) fprintf(stderr, str) |
||||
#else |
||||
#define err(str) |
||||
#endif |
||||
|
||||
static volatile int threads_keepalive; |
||||
static volatile int threads_on_hold; |
||||
|
||||
|
||||
|
||||
/* ========================== STRUCTURES ============================ */ |
||||
|
||||
|
||||
/* Binary semaphore */ |
||||
typedef struct bsem { |
||||
pthread_mutex_t mutex; |
||||
pthread_cond_t cond; |
||||
int v; |
||||
} bsem; |
||||
|
||||
|
||||
/* Job */ |
||||
typedef struct job{ |
||||
struct job* prev; /* pointer to previous job */ |
||||
void (*function)(void* arg); /* function pointer */ |
||||
void* arg; /* function's argument */ |
||||
} job; |
||||
|
||||
|
||||
/* Job queue */ |
||||
typedef struct jobqueue{ |
||||
pthread_mutex_t rwmutex; /* used for queue r/w access */ |
||||
job *front; /* pointer to front of queue */ |
||||
job *rear; /* pointer to rear of queue */ |
||||
bsem *has_jobs; /* flag as binary semaphore */ |
||||
int len; /* number of jobs in queue */ |
||||
} jobqueue; |
||||
|
||||
|
||||
/* Thread */ |
||||
typedef struct thread{ |
||||
int id; /* friendly id */ |
||||
pthread_t pthread; /* pointer to actual thread */ |
||||
struct thpool_* thpool_p; /* access to thpool */ |
||||
} thread; |
||||
|
||||
|
||||
/* Threadpool */ |
||||
typedef struct thpool_{ |
||||
thread** threads; /* pointer to threads */ |
||||
volatile int num_threads_alive; /* threads currently alive */ |
||||
volatile int num_threads_working; /* threads currently working */ |
||||
pthread_mutex_t thcount_lock; /* used for thread count etc */ |
||||
pthread_cond_t threads_all_idle; /* signal to thpool_wait */ |
||||
jobqueue jobqueue; /* job queue */ |
||||
} thpool_; |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/* ========================== PROTOTYPES ============================ */ |
||||
|
||||
|
||||
static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id); |
||||
static void* thread_do(struct thread* thread_p); |
||||
static void thread_hold(int sig_id); |
||||
static void thread_destroy(struct thread* thread_p); |
||||
|
||||
static int jobqueue_init(jobqueue* jobqueue_p); |
||||
static void jobqueue_clear(jobqueue* jobqueue_p); |
||||
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); |
||||
static struct job* jobqueue_pull(jobqueue* jobqueue_p); |
||||
static void jobqueue_destroy(jobqueue* jobqueue_p); |
||||
|
||||
static void bsem_init(struct bsem *bsem_p, int value); |
||||
static void bsem_reset(struct bsem *bsem_p); |
||||
static void bsem_post(struct bsem *bsem_p); |
||||
static void bsem_post_all(struct bsem *bsem_p); |
||||
static void bsem_wait(struct bsem *bsem_p); |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/* ========================== THREADPOOL ============================ */ |
||||
|
||||
|
||||
/* Initialise thread pool */ |
||||
struct thpool_* thpool_init(int num_threads){ |
||||
|
||||
threads_on_hold = 0; |
||||
threads_keepalive = 1; |
||||
|
||||
if (num_threads < 0){ |
||||
num_threads = 0; |
||||
} |
||||
|
||||
/* Make new thread pool */ |
||||
thpool_* thpool_p; |
||||
thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); |
||||
if (thpool_p == NULL){ |
||||
err("thpool_init(): Could not allocate memory for thread pool\n"); |
||||
return NULL; |
||||
} |
||||
thpool_p->num_threads_alive = 0; |
||||
thpool_p->num_threads_working = 0; |
||||
|
||||
/* Initialise the job queue */ |
||||
if (jobqueue_init(&thpool_p->jobqueue) == -1){ |
||||
err("thpool_init(): Could not allocate memory for job queue\n"); |
||||
free(thpool_p); |
||||
return NULL; |
||||
} |
||||
|
||||
/* Make threads in pool */ |
||||
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *)); |
||||
if (thpool_p->threads == NULL){ |
||||
err("thpool_init(): Could not allocate memory for threads\n"); |
||||
jobqueue_destroy(&thpool_p->jobqueue); |
||||
free(thpool_p); |
||||
return NULL; |
||||
} |
||||
|
||||
pthread_mutex_init(&(thpool_p->thcount_lock), NULL); |
||||
pthread_cond_init(&thpool_p->threads_all_idle, NULL); |
||||
|
||||
/* Thread init */ |
||||
int n; |
||||
for (n=0; n<num_threads; n++){ |
||||
thread_init(thpool_p, &thpool_p->threads[n], n); |
||||
#if THPOOL_DEBUG |
||||
printf("THPOOL_DEBUG: Created thread %d in pool \n", n); |
||||
#endif |
||||
} |
||||
|
||||
/* Wait for threads to initialize */ |
||||
while (thpool_p->num_threads_alive != num_threads) {} |
||||
|
||||
return thpool_p; |
||||
} |
||||
|
||||
|
||||
/* Add work to the thread pool */ |
||||
int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){ |
||||
job* newjob; |
||||
|
||||
newjob=(struct job*)malloc(sizeof(struct job)); |
||||
if (newjob==NULL){ |
||||
err("thpool_add_work(): Could not allocate memory for new job\n"); |
||||
return -1; |
||||
} |
||||
|
||||
/* add function and argument */ |
||||
newjob->function=function_p; |
||||
newjob->arg=arg_p; |
||||
|
||||
/* add job to queue */ |
||||
jobqueue_push(&thpool_p->jobqueue, newjob); |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
|
||||
/* Wait until all jobs have finished */ |
||||
void thpool_wait(thpool_* thpool_p){ |
||||
pthread_mutex_lock(&thpool_p->thcount_lock); |
||||
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { |
||||
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); |
||||
} |
||||
pthread_mutex_unlock(&thpool_p->thcount_lock); |
||||
} |
||||
|
||||
|
||||
/* Destroy the threadpool */ |
||||
void thpool_destroy(thpool_* thpool_p){ |
||||
/* No need to destory if it's NULL */ |
||||
if (thpool_p == NULL) return ; |
||||
|
||||
volatile int threads_total = thpool_p->num_threads_alive; |
||||
|
||||
/* End each thread 's infinite loop */ |
||||
threads_keepalive = 0; |
||||
|
||||
/* Give one second to kill idle threads */ |
||||
double TIMEOUT = 1.0; |
||||
time_t start, end; |
||||
double tpassed = 0.0; |
||||
time (&start); |
||||
while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ |
||||
bsem_post_all(thpool_p->jobqueue.has_jobs); |
||||
time (&end); |
||||
tpassed = difftime(end,start); |
||||
} |
||||
|
||||
/* Poll remaining threads */ |
||||
while (thpool_p->num_threads_alive){ |
||||
bsem_post_all(thpool_p->jobqueue.has_jobs); |
||||
sleep(1); |
||||
} |
||||
|
||||
/* Job queue cleanup */ |
||||
jobqueue_destroy(&thpool_p->jobqueue); |
||||
/* Deallocs */ |
||||
int n; |
||||
for (n=0; n < threads_total; n++){ |
||||
thread_destroy(thpool_p->threads[n]); |
||||
} |
||||
free(thpool_p->threads); |
||||
free(thpool_p); |
||||
} |
||||
|
||||
|
||||
/* Pause all threads in threadpool */ |
||||
void thpool_pause(thpool_* thpool_p) { |
||||
int n; |
||||
for (n=0; n < thpool_p->num_threads_alive; n++){ |
||||
pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); |
||||
} |
||||
} |
||||
|
||||
|
||||
/* Resume all threads in threadpool */ |
||||
void thpool_resume(thpool_* thpool_p) { |
||||
// resuming a single threadpool hasn't been
|
||||
// implemented yet, meanwhile this supresses
|
||||
// the warnings
|
||||
(void)thpool_p; |
||||
|
||||
threads_on_hold = 0; |
||||
} |
||||
|
||||
|
||||
int thpool_num_threads_working(thpool_* thpool_p){ |
||||
return thpool_p->num_threads_working; |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/* ============================ THREAD ============================== */ |
||||
|
||||
|
||||
/* Initialize a thread in the thread pool
|
||||
* |
||||
* @param thread address to the pointer of the thread to be created |
||||
* @param id id to be given to the thread |
||||
* @return 0 on success, -1 otherwise. |
||||
*/ |
||||
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ |
||||
|
||||
*thread_p = (struct thread*)malloc(sizeof(struct thread)); |
||||
if (thread_p == NULL){ |
||||
err("thread_init(): Could not allocate memory for thread\n"); |
||||
return -1; |
||||
} |
||||
|
||||
(*thread_p)->thpool_p = thpool_p; |
||||
(*thread_p)->id = id; |
||||
|
||||
pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p)); |
||||
pthread_detach((*thread_p)->pthread); |
||||
return 0; |
||||
} |
||||
|
||||
|
||||
/* Sets the calling thread on hold */ |
||||
static void thread_hold(int sig_id) { |
||||
(void)sig_id; |
||||
threads_on_hold = 1; |
||||
while (threads_on_hold){ |
||||
sleep(1); |
||||
} |
||||
} |
||||
|
||||
|
||||
/* What each thread is doing
|
||||
* |
||||
* In principle this is an endless loop. The only time this loop gets interuppted is once |
||||
* thpool_destroy() is invoked or the program exits. |
||||
* |
||||
* @param thread thread that will run this function |
||||
* @return nothing |
||||
*/ |
||||
static void* thread_do(struct thread* thread_p){ |
||||
|
||||
/* Set thread name for profiling and debuging */ |
||||
char thread_name[128] = {0}; |
||||
sprintf(thread_name, "thread-pool-%d", thread_p->id); |
||||
|
||||
#if defined(__linux__) |
||||
/* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ |
||||
prctl(PR_SET_NAME, thread_name); |
||||
#elif defined(__APPLE__) && defined(__MACH__) |
||||
pthread_setname_np(thread_name); |
||||
#else |
||||
err("thread_do(): pthread_setname_np is not supported on this system"); |
||||
#endif |
||||
|
||||
/* Assure all threads have been created before starting serving */ |
||||
thpool_* thpool_p = thread_p->thpool_p; |
||||
|
||||
/* Register signal handler */ |
||||
struct sigaction act; |
||||
sigemptyset(&act.sa_mask); |
||||
act.sa_flags = 0; |
||||
act.sa_handler = thread_hold; |
||||
if (sigaction(SIGUSR1, &act, NULL) == -1) { |
||||
err("thread_do(): cannot handle SIGUSR1"); |
||||
} |
||||
|
||||
/* Mark thread as alive (initialized) */ |
||||
pthread_mutex_lock(&thpool_p->thcount_lock); |
||||
thpool_p->num_threads_alive += 1; |
||||
pthread_mutex_unlock(&thpool_p->thcount_lock); |
||||
|
||||
while(threads_keepalive){ |
||||
|
||||
bsem_wait(thpool_p->jobqueue.has_jobs); |
||||
|
||||
if (threads_keepalive){ |
||||
|
||||
pthread_mutex_lock(&thpool_p->thcount_lock); |
||||
thpool_p->num_threads_working++; |
||||
pthread_mutex_unlock(&thpool_p->thcount_lock); |
||||
|
||||
/* Read job from queue and execute it */ |
||||
void (*func_buff)(void*); |
||||
void* arg_buff; |
||||
job* job_p = jobqueue_pull(&thpool_p->jobqueue); |
||||
if (job_p) { |
||||
func_buff = job_p->function; |
||||
arg_buff = job_p->arg; |
||||
func_buff(arg_buff); |
||||
free(job_p); |
||||
} |
||||
|
||||
pthread_mutex_lock(&thpool_p->thcount_lock); |
||||
thpool_p->num_threads_working--; |
||||
if (!thpool_p->num_threads_working) { |
||||
pthread_cond_signal(&thpool_p->threads_all_idle); |
||||
} |
||||
pthread_mutex_unlock(&thpool_p->thcount_lock); |
||||
|
||||
} |
||||
} |
||||
pthread_mutex_lock(&thpool_p->thcount_lock); |
||||
thpool_p->num_threads_alive --; |
||||
pthread_mutex_unlock(&thpool_p->thcount_lock); |
||||
|
||||
return NULL; |
||||
} |
||||
|
||||
|
||||
/* Frees a thread */ |
||||
static void thread_destroy (thread* thread_p){ |
||||
free(thread_p); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/* ============================ JOB QUEUE =========================== */ |
||||
|
||||
|
||||
/* Initialize queue */ |
||||
static int jobqueue_init(jobqueue* jobqueue_p){ |
||||
jobqueue_p->len = 0; |
||||
jobqueue_p->front = NULL; |
||||
jobqueue_p->rear = NULL; |
||||
|
||||
jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); |
||||
if (jobqueue_p->has_jobs == NULL){ |
||||
return -1; |
||||
} |
||||
|
||||
pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); |
||||
bsem_init(jobqueue_p->has_jobs, 0); |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
|
||||
/* Clear the queue */ |
||||
static void jobqueue_clear(jobqueue* jobqueue_p){ |
||||
|
||||
while(jobqueue_p->len){ |
||||
free(jobqueue_pull(jobqueue_p)); |
||||
} |
||||
|
||||
jobqueue_p->front = NULL; |
||||
jobqueue_p->rear = NULL; |
||||
bsem_reset(jobqueue_p->has_jobs); |
||||
jobqueue_p->len = 0; |
||||
|
||||
} |
||||
|
||||
|
||||
/* Add (allocated) job to queue
|
||||
*/ |
||||
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ |
||||
|
||||
pthread_mutex_lock(&jobqueue_p->rwmutex); |
||||
newjob->prev = NULL; |
||||
|
||||
switch(jobqueue_p->len){ |
||||
|
||||
case 0: /* if no jobs in queue */ |
||||
jobqueue_p->front = newjob; |
||||
jobqueue_p->rear = newjob; |
||||
break; |
||||
|
||||
default: /* if jobs in queue */ |
||||
jobqueue_p->rear->prev = newjob; |
||||
jobqueue_p->rear = newjob; |
||||
|
||||
} |
||||
jobqueue_p->len++; |
||||
|
||||
bsem_post(jobqueue_p->has_jobs); |
||||
pthread_mutex_unlock(&jobqueue_p->rwmutex); |
||||
} |
||||
|
||||
|
||||
/* Get first job from queue(removes it from queue)
|
||||
<<<<<<< HEAD |
||||
* |
||||
* Notice: Caller MUST hold a mutex |
||||
======= |
||||
>>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490 |
||||
*/ |
||||
static struct job* jobqueue_pull(jobqueue* jobqueue_p){ |
||||
|
||||
pthread_mutex_lock(&jobqueue_p->rwmutex); |
||||
job* job_p = jobqueue_p->front; |
||||
|
||||
switch(jobqueue_p->len){ |
||||
|
||||
case 0: /* if no jobs in queue */ |
||||
break; |
||||
|
||||
case 1: /* if one job in queue */ |
||||
jobqueue_p->front = NULL; |
||||
jobqueue_p->rear = NULL; |
||||
jobqueue_p->len = 0; |
||||
break; |
||||
|
||||
default: /* if >1 jobs in queue */ |
||||
jobqueue_p->front = job_p->prev; |
||||
jobqueue_p->len--; |
||||
/* more than one job in queue -> post it */ |
||||
bsem_post(jobqueue_p->has_jobs); |
||||
|
||||
} |
||||
|
||||
pthread_mutex_unlock(&jobqueue_p->rwmutex); |
||||
return job_p; |
||||
} |
||||
|
||||
|
||||
/* Free all queue resources back to the system */ |
||||
static void jobqueue_destroy(jobqueue* jobqueue_p){ |
||||
jobqueue_clear(jobqueue_p); |
||||
free(jobqueue_p->has_jobs); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/* ======================== SYNCHRONISATION ========================= */ |
||||
|
||||
|
||||
/* Init semaphore to 1 or 0 */ |
||||
static void bsem_init(bsem *bsem_p, int value) { |
||||
if (value < 0 || value > 1) { |
||||
err("bsem_init(): Binary semaphore can take only values 1 or 0"); |
||||
exit(1); |
||||
} |
||||
pthread_mutex_init(&(bsem_p->mutex), NULL); |
||||
pthread_cond_init(&(bsem_p->cond), NULL); |
||||
bsem_p->v = value; |
||||
} |
||||
|
||||
|
||||
/* Reset semaphore to 0 */ |
||||
static void bsem_reset(bsem *bsem_p) { |
||||
bsem_init(bsem_p, 0); |
||||
} |
||||
|
||||
|
||||
/* Post to at least one thread */ |
||||
static void bsem_post(bsem *bsem_p) { |
||||
pthread_mutex_lock(&bsem_p->mutex); |
||||
bsem_p->v = 1; |
||||
pthread_cond_signal(&bsem_p->cond); |
||||
pthread_mutex_unlock(&bsem_p->mutex); |
||||
} |
||||
|
||||
|
||||
/* Post to all threads */ |
||||
static void bsem_post_all(bsem *bsem_p) { |
||||
pthread_mutex_lock(&bsem_p->mutex); |
||||
bsem_p->v = 1; |
||||
pthread_cond_broadcast(&bsem_p->cond); |
||||
pthread_mutex_unlock(&bsem_p->mutex); |
||||
} |
||||
|
||||
|
||||
/* Wait on semaphore until semaphore has value 0 */ |
||||
static void bsem_wait(bsem* bsem_p) { |
||||
pthread_mutex_lock(&bsem_p->mutex); |
||||
while (bsem_p->v != 1) { |
||||
pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); |
||||
} |
||||
bsem_p->v = 0; |
||||
pthread_mutex_unlock(&bsem_p->mutex); |
||||
} |
@ -0,0 +1,187 @@ |
||||
/**********************************
|
||||
* @author Johan Hanssen Seferidis |
||||
* License: MIT |
||||
* |
||||
**********************************/ |
||||
|
||||
#ifndef _THPOOL_ |
||||
#define _THPOOL_ |
||||
|
||||
#ifdef __cplusplus |
||||
extern "C" { |
||||
#endif |
||||
|
||||
/* =================================== API ======================================= */ |
||||
|
||||
|
||||
typedef struct thpool_* threadpool; |
||||
|
||||
|
||||
/**
|
||||
* @brief Initialize threadpool |
||||
* |
||||
* Initializes a threadpool. This function will not return untill all |
||||
* threads have initialized successfully. |
||||
* |
||||
* @example |
||||
* |
||||
* .. |
||||
* threadpool thpool; //First we declare a threadpool
|
||||
* thpool = thpool_init(4); //then we initialize it to 4 threads
|
||||
* .. |
||||
* |
||||
* @param num_threads number of threads to be created in the threadpool |
||||
* @return threadpool created threadpool on success, |
||||
* NULL on error |
||||
*/ |
||||
threadpool thpool_init(int num_threads); |
||||
|
||||
|
||||
/**
|
||||
* @brief Add work to the job queue |
||||
* |
||||
* Takes an action and its argument and adds it to the threadpool's job queue. |
||||
* If you want to add to work a function with more than one arguments then |
||||
* a way to implement this is by passing a pointer to a structure. |
||||
* |
||||
* NOTICE: You have to cast both the function and argument to not get warnings. |
||||
* |
||||
* @example |
||||
* |
||||
* void print_num(int num){ |
||||
* printf("%d\n", num); |
||||
* } |
||||
* |
||||
* int main() { |
||||
* .. |
||||
* int a = 10; |
||||
* thpool_add_work(thpool, (void*)print_num, (void*)a); |
||||
* .. |
||||
* } |
||||
* |
||||
* @param threadpool threadpool to which the work will be added |
||||
* @param function_p pointer to function to add as work |
||||
* @param arg_p pointer to an argument |
||||
* @return 0 on successs, -1 otherwise. |
||||
*/ |
||||
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p); |
||||
|
||||
|
||||
/**
|
||||
* @brief Wait for all queued jobs to finish |
||||
* |
||||
* Will wait for all jobs - both queued and currently running to finish. |
||||
* Once the queue is empty and all work has completed, the calling thread |
||||
* (probably the main program) will continue. |
||||
* |
||||
* Smart polling is used in wait. The polling is initially 0 - meaning that |
||||
* there is virtually no polling at all. If after 1 seconds the threads |
||||
* haven't finished, the polling interval starts growing exponentially |
||||
* untill it reaches max_secs seconds. Then it jumps down to a maximum polling |
||||
* interval assuming that heavy processing is being used in the threadpool. |
||||
* |
||||
* @example |
||||
* |
||||
* .. |
||||
* threadpool thpool = thpool_init(4); |
||||
* .. |
||||
* // Add a bunch of work
|
||||
* .. |
||||
* thpool_wait(thpool); |
||||
* puts("All added work has finished"); |
||||
* .. |
||||
* |
||||
* @param threadpool the threadpool to wait for |
||||
* @return nothing |
||||
*/ |
||||
void thpool_wait(threadpool); |
||||
|
||||
|
||||
/**
|
||||
* @brief Pauses all threads immediately |
||||
* |
||||
* The threads will be paused no matter if they are idle or working. |
||||
* The threads return to their previous states once thpool_resume |
||||
* is called. |
||||
* |
||||
* While the thread is being paused, new work can be added. |
||||
* |
||||
* @example |
||||
* |
||||
* threadpool thpool = thpool_init(4); |
||||
* thpool_pause(thpool); |
||||
* .. |
||||
* // Add a bunch of work
|
||||
* .. |
||||
* thpool_resume(thpool); // Let the threads start their magic
|
||||
* |
||||
* @param threadpool the threadpool where the threads should be paused |
||||
* @return nothing |
||||
*/ |
||||
void thpool_pause(threadpool); |
||||
|
||||
|
||||
/**
|
||||
* @brief Unpauses all threads if they are paused |
||||
* |
||||
* @example |
||||
* .. |
||||
* thpool_pause(thpool); |
||||
* sleep(10); // Delay execution 10 seconds
|
||||
* thpool_resume(thpool); |
||||
* .. |
||||
* |
||||
* @param threadpool the threadpool where the threads should be unpaused |
||||
* @return nothing |
||||
*/ |
||||
void thpool_resume(threadpool); |
||||
|
||||
|
||||
/**
|
||||
* @brief Destroy the threadpool |
||||
* |
||||
* This will wait for the currently active threads to finish and then 'kill' |
||||
* the whole threadpool to free up memory. |
||||
* |
||||
* @example |
||||
* int main() { |
||||
* threadpool thpool1 = thpool_init(2); |
||||
* threadpool thpool2 = thpool_init(2); |
||||
* .. |
||||
* thpool_destroy(thpool1); |
||||
* .. |
||||
* return 0; |
||||
* } |
||||
* |
||||
* @param threadpool the threadpool to destroy |
||||
* @return nothing |
||||
*/ |
||||
void thpool_destroy(threadpool); |
||||
|
||||
|
||||
/**
|
||||
* @brief Show currently working threads |
||||
* |
||||
* Working threads are the threads that are performing work (not idle). |
||||
* |
||||
* @example |
||||
* int main() { |
||||
* threadpool thpool1 = thpool_init(2); |
||||
* threadpool thpool2 = thpool_init(2); |
||||
* .. |
||||
* printf("Working threads: %d\n", thpool_num_threads_working(thpool1)); |
||||
* .. |
||||
* return 0; |
||||
* } |
||||
* |
||||
* @param threadpool the threadpool of interest |
||||
* @return integer number of threads working |
||||
*/ |
||||
int thpool_num_threads_working(threadpool); |
||||
|
||||
|
||||
#ifdef __cplusplus |
||||
} |
||||
#endif |
||||
|
||||
#endif |
@ -0,0 +1,22 @@ |
||||
COPYRIGHT AND PERMISSION NOTICE |
||||
|
||||
Copyright (c) 1996 - 2019, Daniel Stenberg, <daniel@haxx.se>, and many |
||||
contributors, see the THANKS file. |
||||
|
||||
All rights reserved. |
||||
|
||||
Permission to use, copy, modify, and distribute this software for any purpose |
||||
with or without fee is hereby granted, provided that the above copyright |
||||
notice and this permission notice appear in all copies. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS. IN |
||||
NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, |
||||
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
||||
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE |
||||
OR OTHER DEALINGS IN THE SOFTWARE. |
||||
|
||||
Except as contained in this notice, the name of a copyright holder shall not |
||||
be used in advertising or otherwise to promote the sale, use or other dealings |
||||
in this Software without prior written authorization of the copyright holder. |
@ -0,0 +1,290 @@ |
||||
/*
|
||||
* Copyright (C) 2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
* |
||||
* Authors: Mickey Sola |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation. |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, |
||||
* MA 02110-1301, USA. |
||||
*/ |
||||
|
||||
#if HAVE_CONFIG_H |
||||
#include "clamav-config.h" |
||||
#endif |
||||
|
||||
#if defined(FANOTIFY) |
||||
|
||||
#include <stdio.h> |
||||
#include <stdlib.h> |
||||
#include <unistd.h> |
||||
#include <sys/types.h> |
||||
#include <sys/stat.h> |
||||
#include <fcntl.h> |
||||
#include <signal.h> |
||||
#include <pthread.h> |
||||
#include <string.h> |
||||
#include <errno.h> |
||||
#include <stdbool.h> |
||||
|
||||
#include <sys/fanotify.h> |
||||
#include <sys/inotify.h> |
||||
|
||||
#include "../fanotif/onaccess_fan.h" |
||||
#include "onaccess_hash.h" |
||||
#include "onaccess_ddd.h" |
||||
#include "../scan/onaccess_scth.h" |
||||
#include "../misc/onaccess_others.h" |
||||
|
||||
#include "libclamav/clamav.h" |
||||
#include "libclamav/scanners.h" |
||||
|
||||
#include "shared/optparser.h" |
||||
#include "shared/output.h" |
||||
|
||||
#include "clamd/server.h" |
||||
#include "clamd/others.h" |
||||
#include "clamd/scanner.h" |
||||
|
||||
#include "../c-thread-pool/thpool.h" |
||||
|
||||
static void onas_scanque_exit(int sig); |
||||
static int onas_consume_event(struct *event_data); |
||||
|
||||
static pthread_mutex_t onas_queue_lock = PTHREAD_MUTEX_INITIALIZER; |
||||
|
||||
static threadpool g_thpool; |
||||
|
||||
static struct onas_event_queue_node *g_onas_event_queue_head = NULL; |
||||
static struct onas_event_queue_node *g_onas_event_queue_tail = NULL; |
||||
|
||||
static struct onas_event_queue g_onas_event_queue = { |
||||
head = g_onas_event_queue_head, |
||||
tail = g_onas_event_queue_tail, |
||||
|
||||
size = 0; |
||||
}; |
||||
|
||||
static void *onas_init_event_queue() { |
||||
*g_onas_event_queue_head = (struct event_queue_node) { |
||||
.next = NULL, |
||||
.prev = NULL, |
||||
|
||||
.data = NULL |
||||
}; |
||||
|
||||
*g_onas_event_queue_tail = &(struct event_queue_node) { |
||||
.next = NULL, |
||||
.prev = NULL, |
||||
|
||||
.data = NULL |
||||
}; |
||||
|
||||
g_onas_event_queue_tail->prev = g_onas_event_queue_head; |
||||
g_onas_event_queue_head->next = g_onas_event_queue_tail; |
||||
} |
||||
|
||||
extern pthread_t scque_pid; |
||||
|
||||
static cl_error_t onas_new_event_queue_node(struct event_queue_node **node) { |
||||
|
||||
*node = malloc(sizeof(struct onas_event_queue)); |
||||
if (NULL == *node) { |
||||
return CL_EMEM; |
||||
} |
||||
|
||||
|
||||
**node = (struct event_queue_node) { |
||||
.next = NULL, |
||||
.prev = NULL, |
||||
|
||||
.data = NULL |
||||
}; |
||||
|
||||
return CL_SUCCESS; |
||||
} |
||||
|
||||
static void onas_destroy_event_queue_node(struct event_queue_node *node) { |
||||
|
||||
if (NULL == node) { |
||||
return; |
||||
} |
||||
|
||||
node->next = NULL; |
||||
node->prev = NULL; |
||||
node->data = NULL; |
||||
|
||||
free(node); |
||||
node = NULL; |
||||
|
||||
return; |
||||
} |
||||
|
||||
static void onas_destroy_event_queue() { |
||||
|
||||
struct onas_event_queue_node *curr = g_onas_event_queue_head; |
||||
struct onas_event_queue_node *next = curr->next; |
||||
|
||||
do { |
||||
onas_destroy_event_queue_node(curr); |
||||
curr = next; |
||||
if (curr) { |
||||
next = curr->next; |
||||
} |
||||
} while (curr); |
||||
|
||||
return; |
||||
} |
||||
|
||||
|
||||
void *onas_scanque_th(void *arg) { |
||||
|
||||
/* not a ton of use for context right now, but perhaps in the future we can pass in more options */ |
||||
struct onas_context *ctx = (struct onas_context *) arg; |
||||
sigset_t sigset; |
||||
struct sigaction act; |
||||
const struct optstruct *pt; |
||||
int ret, len, idx; |
||||
|
||||
cl_error_t err; |
||||
|
||||
/* ignore all signals except SIGUSR1 */ |
||||
sigfillset(&sigset); |
||||
sigdelset(&sigset, SIGUSR1); |
||||
/* The behavior of a process is undefined after it ignores a
|
||||
* SIGFPE, SIGILL, SIGSEGV, or SIGBUS signal */ |
||||
sigdelset(&sigset, SIGFPE); |
||||
sigdelset(&sigset, SIGILL); |
||||
sigdelset(&sigset, SIGSEGV); |
||||
#ifdef SIGBUS |
||||
sigdelset(&sigset, SIGBUS); |
||||
#endif |
||||
pthread_sigmask(SIG_SETMASK, &sigset, NULL); |
||||
memset(&act, 0, sizeof(struct sigaction)); |
||||
act.sa_handler = onas_scanque_exit; |
||||
sigfillset(&(act.sa_mask)); |
||||
sigaction(SIGUSR1, &act, NULL); |
||||
sigaction(SIGSEGV, &act, NULL); |
||||
|
||||
onas_init_event_queue(); |
||||
threadpool thpool = thpool_init(ctx->maxthreads); |
||||
g_thpool = thpool; |
||||
|
||||
/* loop w/ onas_consume_event until we die */ |
||||
do { |
||||
/* if there's no event to consume ... */ |
||||
if (!onas_consume_event(thpool)) { |
||||
/* sleep for a bit */ |
||||
usleep(500); |
||||
} |
||||
} while(1); |
||||
|
||||
return; |
||||
} |
||||
|
||||
static int onas_queue_is_b_empty() { |
||||
|
||||
if (g_onas_event_queue->head->next == g_onas_event_queue->tail) { |
||||
return 1; |
||||
} |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
static int onas_consume_event(threadpool thpool) { |
||||
|
||||
pthread_mutex_lock(&onas_queue_lock); |
||||
|
||||
struct onas_event_queue_node *popped_node = g_onas_event_queue_head->next; |
||||
|
||||
/* TODO: create scth arg using head event data, use get queue head here before lock*/ |
||||
if (onas_queue_is_b_empty()) { |
||||
return 1; |
||||
} |
||||
|
||||
thpool_add_work(thpool, (void *) onas_scan_worker, (void *) popped_node->data); |
||||
|
||||
g_onas_event_queue_head->next = g_onas_event_queue_head->next->next; |
||||
g_onas_event_queue_head->next->prev = g_onas_event_head; |
||||
|
||||
onas_destroy_event_queue_node(popped_node); |
||||
|
||||
g_onas_event_queue->size--; |
||||
|
||||
pthread_mutex_unlock(&onas_queue_lock); |
||||
return 0; |
||||
} |
||||
|
||||
cl_error_t onas_queue_event(struct onas_scan_event *event_data) { |
||||
|
||||
pthread_mutex_lock(&onas_queue_lock); |
||||
|
||||
struct onas_event_queue_node *node = NULL; |
||||
|
||||
if (CL_EMEM == onas_new_event_queue_node(&node)) { |
||||
return CL_EMEM; |
||||
} |
||||
|
||||
node->next = g_onas_event_queue_tail; |
||||
node->prev = g_onas_event_queue_tail->prev; |
||||
|
||||
node->data = event_data; |
||||
|
||||
/* tail will always have a .prev */ |
||||
((struct onas_event_queue_node *) g_onas_event_queue_tail->prev)->next = node; |
||||
g_onas_event_queue_tail->prev = node; |
||||
|
||||
g_onas_event_queue->size++; |
||||
|
||||
pthread_mutex_unlock(&onas_queue_lock); |
||||
|
||||
return CL_SUCCESS; |
||||
} |
||||
|
||||
cl_error_t onas_scanque_start(struct onas_context **ctx) { |
||||
|
||||
pthread_attr_t scque_attr; |
||||
int32_t thread_started = 1; |
||||
|
||||
if (!ctx || !*ctx) { |
||||
logg("*ClamQueue: unable to start clamonacc. (bad context)\n"); |
||||
return CL_EARG; |
||||
} |
||||
|
||||
if(pthread_attr_init(&scque_attr)) { |
||||
return CL_BREAK; |
||||
} |
||||
pthread_attr_setdetachstate(&scque_attr, PTHREAD_CREATE_JOINABLE); |
||||
thread_started = pthread_create(&scque_pid, &scque_attr, onas_scanque_th, *ctx); |
||||
|
||||
if (0 != thread_started) { |
||||
/* Failed to create thread */ |
||||
logg("*ClamQueue: Unable to start event consumer queue thread ... \n"); |
||||
return CL_ECREAT; |
||||
} |
||||
|
||||
return CL_SUCCESS; |
||||
} |
||||
|
||||
static void onas_scanque_exit(int sig) { |
||||
|
||||
logg("*ClamScanque: onas_scanque_exit(), signal %d\n", sig); |
||||
|
||||
/* TODO: cleanup queue struct */ |
||||
onas_destroy_event_queue(); |
||||
thpool_destroy(g_thpool); |
||||
|
||||
pthread_exit(NULL); |
||||
logg("ClamScanque: stopped\n"); |
||||
} |
||||
|
||||
#endif |
@ -0,0 +1,43 @@ |
||||
/*
|
||||
* Copyright (C) 2019 Cisco Systems, Inc. and/or its affiliates. All rights reserved. |
||||
* |
||||
* Authors: Mickey Sola |
||||
* |
||||
* This program is free software; you can redistribute it and/or modify |
||||
* it under the terms of the GNU General Public License version 2 as |
||||
* published by the Free Software Foundation. |
||||
* |
||||
* This program is distributed in the hope that it will be useful, |
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
* GNU General Public License for more details. |
||||
* |
||||
* You should have received a copy of the GNU General Public License |
||||
* along with this program; if not, write to the Free Software |
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, |
||||
* MA 02110-1301, USA. |
||||
*/ |
||||
|
||||
#ifndef __ONAS_SCQUE_H |
||||
#define __ONAS_SCQUE_H |
||||
|
||||
/* extremely simple event queue implmentation w/ obj number tracking in case we want to place limits later */ |
||||
struct onas_event_queue { |
||||
struct onas_event_queue_node *head; |
||||
struct onas_event_queue_node *tail; |
||||
uint64_t size; |
||||
}; |
||||
|
||||
struct onas_event_queue_node { |
||||
struct onas_event_queue_node *next; |
||||
struct onas_event_queue_node *prev; |
||||
|
||||
struct onas_scan_event *data; |
||||
}; |
||||
|
||||
void *onas_scanque_th(void *arg); |
||||
|
||||
cl_error_t onas_queue_event(struct onas_scan_event *event_data); |
||||
cl_error_t onas_scanque_start(struct onas_context **ctx); |
||||
|
||||
#endif |
Loading…
Reference in new issue