This now compiles.

That's a milestone, of sorts. Things really need
arranging/documenting, and there's no function to
be able to push tasks to a threadpool.



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377036 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Michelson
2012-12-03 16:59:26 +00:00
parent b44f72f794
commit e7ce12839d
2 changed files with 229 additions and 69 deletions

View File

@@ -72,15 +72,15 @@ struct ast_threadpool_listener {
/*! /*!
* \brief Create a new threadpool * \brief Create a new threadpool
* *
* This function creates a threadpool and returns a taskprocessor. Tasks pushed * This function creates a threadpool. Tasks may be pushed onto this thread pool
* to this taskprocessor will be handled by the threadpool and will be reported * in and will be automatically acted upon by threads within the pool.
* on the threadpool's listener.
* *
* \param listener The listener the threadpool will notify of changes * \param listener The listener the threadpool will notify of changes
* \param initial_size The number of threads for the pool to start with
* \retval NULL Failed to create the threadpool * \retval NULL Failed to create the threadpool
* \retval non-NULL The associated taskprocessor * \retval non-NULL The newly-created threadpool
*/ */
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener); struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size);
/*! /*!
* \brief Set the number of threads for the thread pool * \brief Set the number of threads for the thread pool
@@ -93,4 +93,10 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
*/ */
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size); void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size);
/*!
* \brief Shut down a threadpool and destroy it
*
* \param pool The pool to shut down
*/
void ast_threadpool_shutdown(struct ast_threadpool *pool);
#endif /* ASTERISK_THREADPOOL_H */ #endif /* ASTERISK_THREADPOOL_H */

View File

@@ -21,17 +21,21 @@
#include "asterisk/threadpool.h" #include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h" #include "asterisk/taskprocessor.h"
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
#define THREAD_BUCKETS 89 #define THREAD_BUCKETS 89
static int id_counter; static int id_counter;
struct ast_threadpool { struct ast_threadpool {
struct ast_threadpool_listener *threadpool_listener; struct ast_threadpool_listener *listener;
struct ao2_container *active_threads; struct ao2_container *active_threads;
struct ao2_container *idle_threads; struct ao2_container *idle_threads;
struct ao2_container *zombie_threads; struct ao2_container *zombie_threads;
} struct ast_taskprocessor *tps;
struct ast_taskprocessor *control_tps;
};
enum worker_state { enum worker_state {
ALIVE, ALIVE,
@@ -49,9 +53,9 @@ struct worker_thread {
int wake_up; int wake_up;
}; };
static int worker_thread_hash(const void *obj) static int worker_thread_hash(const void *obj, int flags)
{ {
struct worker_thread *worker= obj; const struct worker_thread *worker = obj;
return worker->id; return worker->id;
} }
@@ -64,9 +68,26 @@ static int worker_thread_cmp(void *obj, void *arg, int flags)
return worker1->id == worker2->id ? CMP_MATCH : 0; return worker1->id == worker2->id ? CMP_MATCH : 0;
} }
static worker_thread *worker_thread_alloc(struct ast_threadpool *pool) static void worker_thread_destroy(void *obj)
{ {
struct worker_thread *worker = ao2_alloc(1, sizeof(*worker)); struct worker_thread *worker = obj;
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
}
static int worker_active(struct worker_thread *worker);
static void *worker_start(void *arg)
{
struct worker_thread *worker = arg;
worker_active(worker);
return NULL;
}
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
{
struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
if (!worker) { if (!worker) {
/* XXX Dangit! */ /* XXX Dangit! */
return NULL; return NULL;
@@ -77,7 +98,7 @@ static worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
worker->pool = pool; worker->pool = pool;
worker->thread = AST_PTHREADT_NULL; worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE; worker->state = ALIVE;
if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) { if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
/* XXX Poop! */ /* XXX Poop! */
ao2_ref(worker, -1); ao2_ref(worker, -1);
return NULL; return NULL;
@@ -106,7 +127,7 @@ static void thread_worker_pair_destructor(void *obj)
ao2_ref(pair->worker, -1); ao2_ref(pair->worker, -1);
} }
struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool, static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
struct worker_thread *worker) struct worker_thread *worker)
{ {
struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor); struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
@@ -114,8 +135,10 @@ struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
/*XXX Crap */ /*XXX Crap */
return NULL; return NULL;
} }
pair->pool = ao2_ref(pool); ao2_ref(pool, +1);
pair->worker = ao2_ref(worker); pair->pool = pool;
ao2_ref(worker, +1);
pair->worker = worker;
return pair; return pair;
} }
@@ -140,7 +163,7 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool,
/*XXX Crap */ /*XXX Crap */
return; return;
} }
ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair)); ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
} }
static int queued_zombie_thread_dead(void *data) static int queued_zombie_thread_dead(void *data)
@@ -162,23 +185,28 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
/* XXX Crap */ /* XXX Crap */
return; return;
} }
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair)); ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
} }
static int worker_idle(struct worker_thread *worker) static int worker_idle(struct worker_thread *worker)
{ {
SCOPED_MUTEX(lock, &worker->lock); SCOPED_MUTEX(lock, &worker->lock);
if (worker->state != ALIVE) { if (worker->state != ALIVE) {
return false; return 0;
} }
threadpool_active_thread_idle(worker->pool, worker); threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) { while (!worker->wake_up) {
ast_cond_wait(&worker->cond, lock); ast_cond_wait(&worker->cond, lock);
} }
worker->wake_up = false; worker->wake_up = 0;
return worker->state == ALIVE; return worker->state == ALIVE;
} }
static int threadpool_execute(struct ast_threadpool *pool)
{
return ast_taskprocessor_execute(pool->tps);
}
static int worker_active(struct worker_thread *worker) static int worker_active(struct worker_thread *worker)
{ {
int alive = 1; int alive = 1;
@@ -203,13 +231,47 @@ static int worker_active(struct worker_thread *worker)
return 0; return 0;
} }
static void worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
}
static int worker_shutdown(void *obj, void *arg, int flags)
{
struct worker_thread *worker = obj;
worker_set_state(worker, DEAD);
if (worker->thread != AST_PTHREADT_NULL) {
pthread_join(worker->thread, NULL);
worker->thread = AST_PTHREADT_NULL;
}
return 0;
}
static void threadpool_tps_listener_destroy(void *private_data)
{
struct ast_threadpool *pool = private_data;
/* XXX Probably should let the listener know we're being destroyed? */
/* Threads should all be shut down by now, so this should be a painless
* operation
*/
ao2_ref(pool->active_threads, -1);
ao2_ref(pool->idle_threads, -1);
ao2_ref(pool->zombie_threads, -1);
ao2_ref(pool->listener, -1);
ao2_ref(pool, -1);
}
static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener) static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
{ {
RAII_VAR(ast_threadpool *, pool, RAII_VAR(struct ast_threadpool *, pool,
ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup); ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup);
pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT); pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
if (!pool->control_tps) { if (!pool->control_tps) {
return NULL; return NULL;
} }
@@ -222,32 +284,115 @@ static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *li
return NULL; return NULL;
} }
pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
if (!pool->zombie_thread) { if (!pool->zombie_threads) {
return NULL; return NULL;
} }
pool->tps = listener->tps;
ao2_ref(pool, +1); ao2_ref(pool, +1);
return pool; return pool;
} }
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener) struct task_pushed_data {
struct ast_threadpool *pool;
int was_empty;
};
static void task_pushed_data_destroy(void *obj)
{ {
/* XXX stub */ struct task_pushed_data *tpd = obj;
ao2_ref(tpd->pool, -1);
}
static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
int was_empty)
{
struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
task_pushed_data_destroy);
if (!tpd) {
return NULL;
}
ao2_ref(pool, +1);
tpd->pool = pool;
tpd->was_empty = was_empty;
return tpd;
}
static int activate_threads(void *obj, void *arg, int flags)
{
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
ao2_link(pool->active_threads, worker);
worker_set_state(worker, ALIVE);
return 0;
}
static int handle_task_pushed(void *data)
{
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool);
ao2_ref(tpd, -1);
return 0;
}
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
int was_empty)
{
struct ast_threadpool *pool = listener->private_data;
struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
/* XXX Drat! */
return;
}
ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
}
static int handle_emptied(void *data)
{
struct ast_threadpool *pool = data;
pool->listener->callbacks->emptied(pool->listener);
ao2_ref(pool, -1);
return 0;
} }
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{ {
/* XXX stub */ struct ast_threadpool *pool = listener->private_data;
ao2_ref(pool, +1);
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
} }
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{ {
/* XXX stub */ /*
} * The threadpool triggers the taskprocessor to shut down. As a result,
* we have the freedom of shutting things down in three stages:
*
* 1) Before the tasprocessor is shut down
* 2) During taskprocessor shutdown (here)
* 3) After taskprocessor shutdown
*
* In the spirit of the taskprocessor shutdown, this would be
* where we make sure that all the worker threads are no longer
* executing. We could just do this before we even shut down
* the taskprocessor, but this feels more "right".
*/
static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener) struct ast_threadpool *pool = listener->private_data;
{ ao2_callback(pool->active_threads, 0, worker_shutdown, NULL);
/* XXX stub */ ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL);
} }
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
@@ -258,25 +403,6 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb
.destroy = threadpool_tps_listener_destroy, .destroy = threadpool_tps_listener_destroy,
}; };
/*!
* \brief Allocate the taskprocessor to be used for the threadpool
*
* We use a custom taskprocessor listener. We allocate our custom
* listener and then create a taskprocessor.
*/
static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
{
RAII_VAR(struct threadpool_tps_listener *, tps_listener,
ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
ao2_cleanup);
if (!tps_listener) {
return NULL;
}
return ast_taskprocessor_create_with_listener(tps_listener);
}
static void grow(struct ast_threadpool *pool, int delta) static void grow(struct ast_threadpool *pool, int delta)
{ {
int i; int i;
@@ -292,9 +418,11 @@ static void grow(struct ast_threadpool *pool, int delta)
static int kill_threads(void *obj, void *arg, int flags) static int kill_threads(void *obj, void *arg, int flags)
{ {
struct worker_thread *worker = obj;
int *num_to_kill = arg; int *num_to_kill = arg;
if ((*num_to_kill)-- > 0) { if ((*num_to_kill)-- > 0) {
worker_shutdown(worker, arg, flags);
return CMP_MATCH; return CMP_MATCH;
} else { } else {
return CMP_STOP; return CMP_STOP;
@@ -309,6 +437,7 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags)
if ((*num_to_zombify)-- > 0) { if ((*num_to_zombify)-- > 0) {
ao2_link(pool->zombie_threads, worker); ao2_link(pool->zombie_threads, worker);
worker_set_state(worker, ZOMBIE);
return CMP_MATCH; return CMP_MATCH;
} else { } else {
return CMP_STOP; return CMP_STOP;
@@ -325,7 +454,6 @@ static void shrink(struct ast_threadpool *pool, int delta)
int idle_threads = ao2_container_count(pool->idle_threads); int idle_threads = ao2_container_count(pool->idle_threads);
int idle_threads_to_kill = MIN(delta, idle_threads); int idle_threads_to_kill = MIN(delta, idle_threads);
int active_threads_to_zombify = delta - idle_threads_to_kill; int active_threads_to_zombify = delta - idle_threads_to_kill;
int i = 0;
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
kill_threads, &idle_threads_to_kill); kill_threads, &idle_threads_to_kill);
@@ -335,16 +463,31 @@ static void shrink(struct ast_threadpool *pool, int delta)
} }
struct set_size_data { struct set_size_data {
struct threadpool *pool; struct ast_threadpool *pool;
int size; int size;
}; };
void set_size_data_destroy(void *obj) static void set_size_data_destroy(void *obj)
{ {
struct set_size_data *ssd = obj; struct set_size_data *ssd = obj;
ao2_ref(ssd->pool, -1); ao2_ref(ssd->pool, -1);
} }
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
int size)
{
struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
if (!ssd) {
/* XXX Crap */
return NULL;
}
ao2_ref(pool, +1);
ssd->pool = pool;
ssd->size = size;
return ssd;
}
static int queued_set_size(void *data) static int queued_set_size(void *data)
{ {
struct set_size_data *ssd = data; struct set_size_data *ssd = data;
@@ -355,7 +498,7 @@ static int queued_set_size(void *data)
int current_size = ao2_container_count(pool->active_threads) + int current_size = ao2_container_count(pool->active_threads) +
ao2_container_count(pool->idle_threads); ao2_container_count(pool->idle_threads);
if (current_size = num_threads) { if (current_size == num_threads) {
return 0; return 0;
} }
@@ -366,41 +509,52 @@ static int queued_set_size(void *data)
} }
threadpool_send_state_changed(pool); threadpool_send_state_changed(pool);
ao2_ref(set_size_data, -1); ao2_ref(ssd, -1);
return 0;
} }
void ast_threadpool_set_size(struct ast_threadpool *pool, int size) void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{ {
struct set_size_data *ssd; struct set_size_data *ssd;
if (size < 0) {
ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size);
return;
}
ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy); ssd = set_size_data_alloc(pool, size);
if (!ssd) { if (!ssd) {
/* XXX Crap */ /* XXX *groan* */
return; return;
} }
ssd->pool = ao2_ref(pool);
ssd->size = size;
ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
} }
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
{ {
struct ast_threadpool *pool; struct ast_threadpool *pool;
RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference); struct ast_taskprocessor *tps;
RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
ao2_cleanup);
if (!tps_listener) {
return NULL;
}
tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
if (!tps) { if (!tps) {
return NULL; return NULL;
} }
pool = tps->listener->private_data; pool = tps_listener->private_data;
pool->tps = tps;
ast_threadpool_set_size(pool, initial_size); ast_threadpool_set_size(pool, initial_size);
return pool; return pool;
} }
void ast_threadpool_shutdown(struct ast_threadpool *pool)
{
/* Pretty simple really. We just shut down the
* taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}