Address review board feedback from Matt and Richard

* Remove extraneous whitespace
* Bump up debug levels of messages and add identifying info to messages.
* Account for potential failures of ao2_link()
* Add additional test and some more test data
* Add some comments in places where they could be useful
* Make threadpool listeners and their callbacks optional



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@378652 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Michelson
2013-01-07 22:16:06 +00:00
parent ad73fe2c9f
commit bdd8da406b
5 changed files with 190 additions and 63 deletions

View File

@@ -35,24 +35,24 @@
struct ast_threadpool {
/*! Threadpool listener */
struct ast_threadpool_listener *listener;
/*!
/*!
* \brief The container of active threads.
* Active threads are those that are currently running tasks
*/
struct ao2_container *active_threads;
/*!
/*!
* \brief The container of idle threads.
* Idle threads are those that are currenly waiting to run tasks
*/
struct ao2_container *idle_threads;
/*!
/*!
* \brief The container of zombie threads.
* Zombie threads may be running tasks, but they are scheduled to die soon
*/
struct ao2_container *zombie_threads;
/*!
/*!
* \brief The main taskprocessor
*
*
* Tasks that are queued in this taskprocessor are
* doled out to the worker threads. Worker threads that
* execute tasks from the threadpool are executing tasks
@@ -122,14 +122,36 @@ enum worker_state {
DEAD,
};
/*!
* A thread that executes threadpool tasks
*/
struct worker_thread {
/*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
int id;
/*! Condition used in conjunction with state changes */
ast_cond_t cond;
/*! Lock used alongside the condition for state changes */
ast_mutex_t lock;
/*! The actual thread that is executing tasks */
pthread_t thread;
/*! A pointer to the threadpool. Needed to be able to execute tasks */
struct ast_threadpool *pool;
/*! The current state of the worker thread */
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
/*! Options for this threadpool */
struct ast_threadpool_options options;
};
/* Worker thread forward declarations. See definitions for documentation */
struct worker_thread;
static int worker_thread_hash(const void *obj, int flags);
static int worker_thread_cmp(void *obj, void *arg, int flags);
static void worker_thread_destroy(void *obj);
static void worker_active(struct worker_thread *worker);
static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
static void worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
@@ -145,7 +167,9 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool)
int active_size = ao2_container_count(pool->active_threads);
int idle_size = ao2_container_count(pool->idle_threads);
pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
if (pool->listener && pool->listener->callbacks->state_changed) {
pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
}
}
/*!
@@ -296,7 +320,7 @@ static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
/*!
* \brief Execute a task in the threadpool
*
*
* This is the function that worker threads call in order to execute tasks
* in the threadpool
*
@@ -430,7 +454,14 @@ static int activate_thread(void *obj, void *arg, int flags)
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
ao2_link(pool->active_threads, worker);
if (!ao2_link(pool->active_threads, worker)) {
/* If we can't link the idle thread into the active container, then
* we'll just leave the thread idle and not wake it up.
*/
ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
worker->id);
return 0;
}
worker_set_state(worker, ALIVE);
return CMP_MATCH;
}
@@ -446,14 +477,22 @@ static void grow(struct ast_threadpool *pool, int delta)
{
int i;
ast_debug(1, "Going to increase threadpool size by %d\n", delta);
ast_debug(3, "Increasing threadpool %s's size by %d\n",
ast_taskprocessor_name(pool->tps), delta);
for (i = 0; i < delta; ++i) {
struct worker_thread *worker = worker_thread_alloc(pool);
if (!worker) {
return;
}
ao2_link(pool->active_threads, worker);
if (ao2_link(pool->active_threads, worker)) {
if (worker_thread_start(worker)) {
ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
ao2_unlink(pool->active_threads, worker);
}
} else {
ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
}
ao2_ref(worker, -1);
}
}
@@ -474,7 +513,9 @@ static int queued_task_pushed(void *data)
int was_empty = tpd->was_empty;
int state_changed;
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
if (ao2_container_count(pool->idle_threads) == 0) {
if (pool->options.auto_increment > 0) {
grow(pool, pool->options.auto_increment);
@@ -530,6 +571,7 @@ static int queued_emptied(void *data)
{
struct ast_threadpool *pool = data;
/* We already checked for existence of this callback when this was queued */
pool->listener->callbacks->emptied(pool, pool->listener);
return 0;
}
@@ -550,7 +592,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
return;
}
ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
if (pool->listener && pool->listener->callbacks->emptied) {
ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
}
}
/*!
@@ -649,7 +693,10 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags)
int *num_to_zombify = data;
if ((*num_to_zombify)-- > 0) {
ao2_link(pool->zombie_threads, worker);
if (!ao2_link(pool->zombie_threads, worker)) {
ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
return 0;
}
worker_set_state(worker, ZOMBIE);
return CMP_MATCH;
} else {
@@ -671,7 +718,7 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags)
*/
static void shrink(struct ast_threadpool *pool, int delta)
{
/*
/*
* Preference is to kill idle threads, but
* we'll move on to deactivating active threads
* if we have to
@@ -680,12 +727,14 @@ static void shrink(struct ast_threadpool *pool, int delta)
int idle_threads_to_kill = MIN(delta, idle_threads);
int active_threads_to_zombify = delta - idle_threads_to_kill;
ast_debug(1, "Going to kill off %d idle threads\n", idle_threads_to_kill);
ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
ast_taskprocessor_name(pool->tps));
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
kill_threads, &idle_threads_to_kill);
ast_debug(1, "Going to kill off %d active threads\n", active_threads_to_zombify);
ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
ast_taskprocessor_name(pool->tps));
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
zombify_threads, pool, &active_threads_to_zombify);
@@ -828,8 +877,10 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
pool = tps_listener->private_data;
pool->tps = tps;
ao2_ref(listener, +1);
pool->listener = listener;
if (listener) {
ao2_ref(listener, +1);
pool->listener = listener;
}
pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
return pool;
@@ -856,28 +907,6 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool)
ast_taskprocessor_unreference(pool->tps);
}
/*!
* A thread that executes threadpool tasks
*/
struct worker_thread {
/*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
int id;
/*! Condition used in conjunction with state changes */
ast_cond_t cond;
/*! Lock used alongside the condition for state changes */
ast_mutex_t lock;
/*! The actual thread that is executing tasks */
pthread_t thread;
/*! A pointer to the threadpool. Needed to be able to execute tasks */
struct ast_threadpool *pool;
/*! The current state of the worker thread */
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
/*! Options for this threadpool */
struct ast_threadpool_options options;
};
/*!
* A monotonically increasing integer used for worker
* thread identification.
@@ -926,7 +955,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
ast_debug(1, "Destroying worker thread\n");
ast_debug(3, "Destroying worker thread %d\n", worker->id);
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
@@ -972,14 +1001,14 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
worker->options = pool->options;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
return NULL;
}
return worker;
}
static int worker_thread_start(struct worker_thread *worker)
{
return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
}
/*!
* \brief Active loop for worker threads
*
@@ -994,7 +1023,7 @@ static void worker_active(struct worker_thread *worker)
{
int alive = 1;
while (alive) {
if (threadpool_execute(worker->pool) == 0) {
if (!threadpool_execute(worker->pool)) {
alive = worker_idle(worker);
}
}