mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-05 20:20:07 +00:00
Remove zombie state from threadpool altogether.
After giving it some consideration, there's no real use for zombie threads. Listeners can't really use the current number of zombie threads as a way of gauging activity, zombifying threads is just an extra step before they die that really serves no purpose, and since there's no way to re-animate zombies, the operation does not need to be around. I also fixed up some miscellaneous compilation errors that were lingering from some past revisions. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377211 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -31,12 +31,10 @@ struct ast_threadpool_listener_callbacks {
|
|||||||
* \param listener The threadpool listener
|
* \param listener The threadpool listener
|
||||||
* \param active_threads The number of active threads in the pool
|
* \param active_threads The number of active threads in the pool
|
||||||
* \param idle_threads The number of idle threads in the pool
|
* \param idle_threads The number of idle threads in the pool
|
||||||
* \param zombie_threads The number of zombie threads in the pool
|
|
||||||
*/
|
*/
|
||||||
void (*state_changed)(struct ast_threadpool_listener *listener,
|
void (*state_changed)(struct ast_threadpool_listener *listener,
|
||||||
int active_threads,
|
int active_threads,
|
||||||
int idle_threads,
|
int idle_threads);
|
||||||
int zombie_threads);
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Indicates that a task was pushed to the threadpool's taskprocessor
|
* \brief Indicates that a task was pushed to the threadpool's taskprocessor
|
||||||
*
|
*
|
||||||
|
@@ -45,11 +45,6 @@ struct ast_threadpool {
|
|||||||
* Idle threads are those that are currenly waiting to run tasks
|
* Idle threads are those that are currenly waiting to run tasks
|
||||||
*/
|
*/
|
||||||
struct ao2_container *idle_threads;
|
struct ao2_container *idle_threads;
|
||||||
/*!
|
|
||||||
* \brief The container of zombie threads.
|
|
||||||
* Zombie threads may be running tasks, but they are scheduled to die soon
|
|
||||||
*/
|
|
||||||
struct ao2_container *zombie_threads;
|
|
||||||
/*!
|
/*!
|
||||||
* \brief The main taskprocessor
|
* \brief The main taskprocessor
|
||||||
*
|
*
|
||||||
@@ -84,7 +79,7 @@ struct ast_threadpool {
|
|||||||
* This is done for three main reasons
|
* This is done for three main reasons
|
||||||
* 1) It ensures that listeners are given an accurate portrayal
|
* 1) It ensures that listeners are given an accurate portrayal
|
||||||
* of the threadpool's current state. In other words, when a listener
|
* of the threadpool's current state. In other words, when a listener
|
||||||
* gets told a count of active, idle and zombie threads, it does not
|
* gets told a count of active and idle threads, it does not
|
||||||
* need to worry that internal state of the threadpool might be different
|
* need to worry that internal state of the threadpool might be different
|
||||||
* from what it has been told.
|
* from what it has been told.
|
||||||
* 2) It minimizes the locking required in both the threadpool and in
|
* 2) It minimizes the locking required in both the threadpool and in
|
||||||
@@ -101,20 +96,7 @@ struct ast_threadpool {
|
|||||||
enum worker_state {
|
enum worker_state {
|
||||||
/*! The worker is either active or idle */
|
/*! The worker is either active or idle */
|
||||||
ALIVE,
|
ALIVE,
|
||||||
/*!
|
/*! The worker has been asked to shut down. */
|
||||||
* The worker has been asked to shut down but
|
|
||||||
* may still be in the process of executing tasks.
|
|
||||||
* This transition happens when the threadpool needs
|
|
||||||
* to shrink and needs to kill active threads in order
|
|
||||||
* to do so.
|
|
||||||
*/
|
|
||||||
ZOMBIE,
|
|
||||||
/*!
|
|
||||||
* The worker has been asked to shut down. Typically
|
|
||||||
* only idle threads go to this state directly, but
|
|
||||||
* active threads may go straight to this state when
|
|
||||||
* the threadpool is shut down.
|
|
||||||
*/
|
|
||||||
DEAD,
|
DEAD,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -128,7 +110,7 @@ static void *worker_start(void *arg);
|
|||||||
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
|
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
|
||||||
static int worker_idle(struct worker_thread *worker);
|
static int worker_idle(struct worker_thread *worker);
|
||||||
static void worker_set_state(struct worker_thread *worker, enum worker_state state);
|
static void worker_set_state(struct worker_thread *worker, enum worker_state state);
|
||||||
static int worker_shutdown(void *obj, void *arg, int flags);
|
static void worker_shutdown(struct worker_thread *worker);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Notify the threadpool listener that the state has changed.
|
* \brief Notify the threadpool listener that the state has changed.
|
||||||
@@ -140,9 +122,8 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool)
|
|||||||
{
|
{
|
||||||
int active_size = ao2_container_count(pool->active_threads);
|
int active_size = ao2_container_count(pool->active_threads);
|
||||||
int idle_size = ao2_container_count(pool->idle_threads);
|
int idle_size = ao2_container_count(pool->idle_threads);
|
||||||
int zombie_size = ao2_container_count(pool->zombie_threads);
|
|
||||||
|
|
||||||
pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
|
pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
@@ -220,41 +201,6 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool,
|
|||||||
ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
|
ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Kill a zombie thread
|
|
||||||
*
|
|
||||||
* This runs from the threadpool's control taskprocessor thread.
|
|
||||||
*
|
|
||||||
* \param data A thread_worker_pair containing the threadpool and the zombie thread
|
|
||||||
* \return 0
|
|
||||||
*/
|
|
||||||
static int queued_zombie_thread_dead(void *data)
|
|
||||||
{
|
|
||||||
struct thread_worker_pair *pair = data;
|
|
||||||
|
|
||||||
ao2_unlink(pair->pool, pair->worker);
|
|
||||||
threadpool_send_state_changed(pair->pool);
|
|
||||||
|
|
||||||
ao2_ref(pair, -1);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Queue a task to kill a zombie thread
|
|
||||||
*
|
|
||||||
* This is called by a worker thread when it acknowledges that it is time for
|
|
||||||
* it to die.
|
|
||||||
*/
|
|
||||||
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
|
|
||||||
struct worker_thread *worker)
|
|
||||||
{
|
|
||||||
struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
|
|
||||||
if (!pair) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Execute a task in the threadpool
|
* \brief Execute a task in the threadpool
|
||||||
*
|
*
|
||||||
@@ -322,10 +268,6 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
|
|||||||
if (!pool->idle_threads) {
|
if (!pool->idle_threads) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
|
|
||||||
if (!pool->zombie_threads) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pool->tps = listener->tps;
|
pool->tps = listener->tps;
|
||||||
|
|
||||||
@@ -470,11 +412,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
|
|||||||
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
|
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
|
||||||
{
|
{
|
||||||
struct ast_threadpool *pool = listener->private_data;
|
struct ast_threadpool *pool = listener->private_data;
|
||||||
int flags = (OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE);
|
|
||||||
|
|
||||||
ao2_cleanup(pool->active_threads);
|
ao2_cleanup(pool->active_threads);
|
||||||
ao2_cleanup(pool->idle_threads);
|
ao2_cleanup(pool->idle_threads);
|
||||||
ao2_cleanup(pool->zombie_threads);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
@@ -537,7 +477,6 @@ static void grow(struct ast_threadpool *pool, int delta)
|
|||||||
*/
|
*/
|
||||||
static int kill_threads(void *obj, void *arg, int flags)
|
static int kill_threads(void *obj, void *arg, int flags)
|
||||||
{
|
{
|
||||||
struct worker_thread *worker = obj;
|
|
||||||
int *num_to_kill = arg;
|
int *num_to_kill = arg;
|
||||||
|
|
||||||
if ((*num_to_kill)-- > 0) {
|
if ((*num_to_kill)-- > 0) {
|
||||||
@@ -547,46 +486,12 @@ static int kill_threads(void *obj, void *arg, int flags)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief ao2 callback to zombify a set number of threads.
|
|
||||||
*
|
|
||||||
* Threads will be zombified as long as as the counter has not reached
|
|
||||||
* zero. The counter is decremented with each thread that is zombified.
|
|
||||||
*
|
|
||||||
* Zombifying a thread involves removing it from its current container,
|
|
||||||
* adding it to the zombie container, and changing the state of the
|
|
||||||
* worker to a zombie
|
|
||||||
*
|
|
||||||
* This callback is called from the threadpool control taskprocessor thread.
|
|
||||||
*
|
|
||||||
* \param obj The worker thread that may be zombified
|
|
||||||
* \param arg The pool to which the worker belongs
|
|
||||||
* \param data The counter
|
|
||||||
* \param flags Unused
|
|
||||||
* \retval CMP_MATCH The zombified thread should be removed from its current container
|
|
||||||
* \retval CMP_STOP Stop attempting to zombify threads
|
|
||||||
*/
|
|
||||||
static int zombify_threads(void *obj, void *arg, void *data, int flags)
|
|
||||||
{
|
|
||||||
struct worker_thread *worker = obj;
|
|
||||||
struct ast_threadpool *pool = arg;
|
|
||||||
int *num_to_zombify = data;
|
|
||||||
|
|
||||||
if ((*num_to_zombify)-- > 0) {
|
|
||||||
ao2_link(pool->zombie_threads, worker);
|
|
||||||
worker_set_state(worker, ZOMBIE);
|
|
||||||
return CMP_MATCH;
|
|
||||||
} else {
|
|
||||||
return CMP_STOP;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Remove threads from the threadpool
|
* \brief Remove threads from the threadpool
|
||||||
*
|
*
|
||||||
* The preference is to kill idle threads. However, if there are
|
* The preference is to kill idle threads. However, if there are
|
||||||
* more threads to remove than there are idle threads, then active
|
* more threads to remove than there are idle threads, then active
|
||||||
* threads will be zombified instead.
|
* threads will be removed too.
|
||||||
*
|
*
|
||||||
* This function is called from the threadpool control taskprocessor thread.
|
* This function is called from the threadpool control taskprocessor thread.
|
||||||
*
|
*
|
||||||
@@ -595,20 +500,15 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags)
|
|||||||
*/
|
*/
|
||||||
static void shrink(struct ast_threadpool *pool, int delta)
|
static void shrink(struct ast_threadpool *pool, int delta)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* Preference is to kill idle threads, but
|
|
||||||
* we'll move on to deactivating active threads
|
|
||||||
* if we have to
|
|
||||||
*/
|
|
||||||
int idle_threads = ao2_container_count(pool->idle_threads);
|
int idle_threads = ao2_container_count(pool->idle_threads);
|
||||||
int idle_threads_to_kill = MIN(delta, idle_threads);
|
int idle_threads_to_kill = MIN(delta, idle_threads);
|
||||||
int active_threads_to_zombify = delta - idle_threads_to_kill;
|
int active_threads_to_kill = delta - idle_threads_to_kill;
|
||||||
|
|
||||||
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
|
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
|
||||||
kill_threads, &idle_threads_to_kill);
|
kill_threads, &idle_threads_to_kill);
|
||||||
|
|
||||||
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
|
ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
|
||||||
zombify_threads, pool, &active_threads_to_zombify);
|
kill_threads, &active_threads_to_kill);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
@@ -629,7 +529,7 @@ struct set_size_data {
|
|||||||
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
|
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
|
||||||
unsigned int size)
|
unsigned int size)
|
||||||
{
|
{
|
||||||
struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
|
struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
|
||||||
if (!ssd) {
|
if (!ssd) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -654,22 +554,20 @@ static int queued_set_size(void *data)
|
|||||||
{
|
{
|
||||||
struct set_size_data *ssd = data;
|
struct set_size_data *ssd = data;
|
||||||
struct ast_threadpool *pool = ssd->pool;
|
struct ast_threadpool *pool = ssd->pool;
|
||||||
unsigned int num_threads = ssd->size;
|
unsigned int new_size = ssd->size;
|
||||||
|
|
||||||
/* We don't count zombie threads as being "live when potentially resizing */
|
|
||||||
unsigned int current_size = ao2_container_count(pool->active_threads) +
|
unsigned int current_size = ao2_container_count(pool->active_threads) +
|
||||||
ao2_container_count(pool->idle_threads);
|
ao2_container_count(pool->idle_threads);
|
||||||
|
|
||||||
if (current_size == num_threads) {
|
if (current_size == new_size) {
|
||||||
ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
|
ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
|
||||||
num_threads, current_size);
|
new_size, current_size);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (current_size < num_threads) {
|
if (current_size < new_size) {
|
||||||
grow(pool, num_threads - current_size);
|
grow(pool, new_size - current_size);
|
||||||
} else {
|
} else {
|
||||||
shrink(pool, current_size - num_threads);
|
shrink(pool, current_size - new_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
threadpool_send_state_changed(pool);
|
threadpool_send_state_changed(pool);
|
||||||
@@ -777,8 +675,6 @@ static int worker_thread_cmp(void *obj, void *arg, int flags)
|
|||||||
*/
|
*/
|
||||||
static void worker_shutdown(struct worker_thread *worker)
|
static void worker_shutdown(struct worker_thread *worker)
|
||||||
{
|
{
|
||||||
struct worker_thread *worker = obj;
|
|
||||||
|
|
||||||
worker_set_state(worker, DEAD);
|
worker_set_state(worker, DEAD);
|
||||||
if (worker->thread != AST_PTHREADT_NULL) {
|
if (worker->thread != AST_PTHREADT_NULL) {
|
||||||
pthread_join(worker->thread, NULL);
|
pthread_join(worker->thread, NULL);
|
||||||
@@ -866,18 +762,6 @@ static void worker_active(struct worker_thread *worker)
|
|||||||
alive = worker_idle(worker);
|
alive = worker_idle(worker);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reaching this portion means the thread is
|
|
||||||
* on death's door. It may have been killed while
|
|
||||||
* it was idle, in which case it can just die
|
|
||||||
* peacefully. If it's a zombie, though, then
|
|
||||||
* it needs to let the pool know so
|
|
||||||
* that the thread can be removed from the
|
|
||||||
* list of zombie threads.
|
|
||||||
*/
|
|
||||||
if (worker->state == ZOMBIE) {
|
|
||||||
threadpool_zombie_thread_dead(worker->pool, worker);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
|
Reference in New Issue
Block a user