Remove threadpool listener alloc and destroy callbacks.

This replaces the destroy callback with a shutdown callback
instead.



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@379122 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Michelson
2013-01-15 19:36:33 +00:00
parent 65c7d6e2c3
commit edc2e4dac0
3 changed files with 124 additions and 75 deletions

View File

@@ -25,18 +25,10 @@ struct ast_taskprocessor;
struct ast_threadpool_listener; struct ast_threadpool_listener;
struct ast_threadpool_listener_callbacks { struct ast_threadpool_listener_callbacks {
/*!
* \brief Allocate the listener's private data
*
* It is not necessary to assign the private data to the listener.
* \param listener The listener the private data will belong to
* \retval NULL Failure to allocate private data
* \retval non-NULL The newly allocated private data
*/
void *(*alloc)(struct ast_threadpool_listener *listener);
/*! /*!
* \brief Indicates that the state of threads in the pool has changed * \brief Indicates that the state of threads in the pool has changed
* *
* \param pool The pool whose state has changed
* \param listener The threadpool listener * \param listener The threadpool listener
* \param active_threads The number of active threads in the pool * \param active_threads The number of active threads in the pool
* \param idle_threads The number of idle threads in the pool * \param idle_threads The number of idle threads in the pool
@@ -48,6 +40,7 @@ struct ast_threadpool_listener_callbacks {
/*! /*!
* \brief Indicates that a task was pushed to the threadpool * \brief Indicates that a task was pushed to the threadpool
* *
* \param pool The pool that had a task pushed
* \param listener The threadpool listener * \param listener The threadpool listener
* \param was_empty Indicates whether there were any tasks prior to adding the new one. * \param was_empty Indicates whether there were any tasks prior to adding the new one.
*/ */
@@ -57,15 +50,21 @@ struct ast_threadpool_listener_callbacks {
/*! /*!
* \brief Indicates the threadpool's taskprocessor has become empty * \brief Indicates the threadpool's taskprocessor has become empty
* *
* \param pool The pool that has become empty
* \param listener The threadpool's listener * \param listener The threadpool's listener
*/ */
void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener); void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener);
/*! /*!
* \brief Free the listener's private data * \brief The threadpool is shutting down
* \param private_data The private data to destroy *
* This would be an opportune time to free the listener's user data
* if one wishes. However, it is acceptable to not do so if the user data
* should persist beyond the lifetime of the pool.
*
* \param listener The threadpool's listener
*/ */
void (*destroy)(void *private_data); void (*shutdown)(struct ast_threadpool_listener *listener);
}; };
/*! /*!
@@ -79,7 +78,7 @@ struct ast_threadpool_listener {
/*! Callbacks called by the threadpool */ /*! Callbacks called by the threadpool */
const struct ast_threadpool_listener_callbacks *callbacks; const struct ast_threadpool_listener_callbacks *callbacks;
/*! User data for the listener */ /*! User data for the listener */
void *private_data; void *user_data;
}; };
struct ast_threadpool_options { struct ast_threadpool_options {
@@ -112,11 +111,12 @@ struct ast_threadpool_options {
* listener. * listener.
* *
* \param callbacks Listener callbacks to assign to the listener * \param callbacks Listener callbacks to assign to the listener
* \param user_data User data to be stored in the threadpool listener
* \retval NULL Failed to allocate the listener * \retval NULL Failed to allocate the listener
* \retval non-NULL The newly-created threadpool listener * \retval non-NULL The newly-created threadpool listener
*/ */
struct ast_threadpool_listener *ast_threadpool_listener_alloc( struct ast_threadpool_listener *ast_threadpool_listener_alloc(
const struct ast_threadpool_listener_callbacks *callbacks); const struct ast_threadpool_listener_callbacks *callbacks, void *user_data);
/*! /*!
* \brief Create a new threadpool * \brief Create a new threadpool

View File

@@ -613,6 +613,7 @@ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{ {
struct ast_threadpool *pool = listener->user_data; struct ast_threadpool *pool = listener->user_data;
pool->listener->callbacks->shutdown(pool->listener);
ao2_cleanup(pool->active_threads); ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads); ao2_cleanup(pool->idle_threads);
ao2_cleanup(pool->zombie_threads); ao2_cleanup(pool->zombie_threads);
@@ -808,26 +809,15 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
} }
static void listener_destructor(void *obj)
{
struct ast_threadpool_listener *listener = obj;
listener->callbacks->destroy(listener->private_data);
}
struct ast_threadpool_listener *ast_threadpool_listener_alloc( struct ast_threadpool_listener *ast_threadpool_listener_alloc(
const struct ast_threadpool_listener_callbacks *callbacks) const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
{ {
struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor); struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
if (!listener) { if (!listener) {
return NULL; return NULL;
} }
listener->callbacks = callbacks; listener->callbacks = callbacks;
listener->private_data = listener->callbacks->alloc(listener); listener->user_data = user_data;
if (!listener->private_data) {
ao2_ref(listener, -1);
return NULL;
}
return listener; return listener;
} }

View File

@@ -49,7 +49,7 @@ struct test_listener_data {
ast_cond_t cond; ast_cond_t cond;
}; };
static void *test_alloc(struct ast_threadpool_listener *listener) static struct test_listener_data *test_alloc(void)
{ {
struct test_listener_data *tld = ast_calloc(1, sizeof(*tld)); struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
if (!tld) { if (!tld) {
@@ -65,7 +65,7 @@ static void test_state_changed(struct ast_threadpool *pool,
int active_threads, int active_threads,
int idle_threads) int idle_threads)
{ {
struct test_listener_data *tld = listener->private_data; struct test_listener_data *tld = listener->user_data;
SCOPED_MUTEX(lock, &tld->lock); SCOPED_MUTEX(lock, &tld->lock);
tld->num_active = active_threads; tld->num_active = active_threads;
tld->num_idle = idle_threads; tld->num_idle = idle_threads;
@@ -77,7 +77,7 @@ static void test_task_pushed(struct ast_threadpool *pool,
struct ast_threadpool_listener *listener, struct ast_threadpool_listener *listener,
int was_empty) int was_empty)
{ {
struct test_listener_data *tld = listener->private_data; struct test_listener_data *tld = listener->user_data;
SCOPED_MUTEX(lock, &tld->lock); SCOPED_MUTEX(lock, &tld->lock);
tld->task_pushed = 1; tld->task_pushed = 1;
++tld->num_tasks; ++tld->num_tasks;
@@ -88,26 +88,24 @@ static void test_task_pushed(struct ast_threadpool *pool,
static void test_emptied(struct ast_threadpool *pool, static void test_emptied(struct ast_threadpool *pool,
struct ast_threadpool_listener *listener) struct ast_threadpool_listener *listener)
{ {
struct test_listener_data *tld = listener->private_data; struct test_listener_data *tld = listener->user_data;
SCOPED_MUTEX(lock, &tld->lock); SCOPED_MUTEX(lock, &tld->lock);
tld->empty_notice = 1; tld->empty_notice = 1;
ast_cond_signal(&tld->cond); ast_cond_signal(&tld->cond);
} }
static void test_destroy(void *private_data) static void test_shutdown(struct ast_threadpool_listener *listener)
{ {
struct test_listener_data *tld = private_data; struct test_listener_data *tld = listener->user_data;
ast_cond_destroy(&tld->cond); ast_cond_destroy(&tld->cond);
ast_mutex_destroy(&tld->lock); ast_mutex_destroy(&tld->lock);
ast_free(tld);
} }
static const struct ast_threadpool_listener_callbacks test_callbacks = { static const struct ast_threadpool_listener_callbacks test_callbacks = {
.alloc = test_alloc,
.state_changed = test_state_changed, .state_changed = test_state_changed,
.task_pushed = test_task_pushed, .task_pushed = test_task_pushed,
.emptied = test_emptied, .emptied = test_emptied,
.destroy = test_destroy, .shutdown = test_shutdown,
}; };
struct simple_task_data { struct simple_task_data {
@@ -165,7 +163,7 @@ static enum ast_test_result_state wait_until_thread_state(struct ast_test *test,
static void wait_for_task_pushed(struct ast_threadpool_listener *listener) static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
{ {
struct test_listener_data *tld = listener->private_data; struct test_listener_data *tld = listener->user_data;
struct timeval start = ast_tvnow(); struct timeval start = ast_tvnow();
struct timespec end = { struct timespec end = {
.tv_sec = start.tv_sec + 5, .tv_sec = start.tv_sec + 5,
@@ -237,7 +235,7 @@ static enum ast_test_result_state listener_check(
int num_idle, int num_idle,
int empty_notice) int empty_notice)
{ {
struct test_listener_data *tld = listener->private_data; struct test_listener_data *tld = listener->user_data;
enum ast_test_result_state res = AST_TEST_PASS; enum ast_test_result_state res = AST_TEST_PASS;
if (tld->task_pushed != task_pushed) { if (tld->task_pushed != task_pushed) {
@@ -279,6 +277,7 @@ AST_TEST_DEFINE(threadpool_push)
struct ast_threadpool *pool = NULL; struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL; struct ast_threadpool_listener *listener = NULL;
struct simple_task_data *std = NULL; struct simple_task_data *std = NULL;
struct test_listener_data *tld = NULL;
enum ast_test_result_state res = AST_TEST_FAIL; enum ast_test_result_state res = AST_TEST_FAIL;
struct ast_threadpool_options options = { struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION, .version = AST_THREADPOOL_OPTIONS_VERSION,
@@ -297,12 +296,16 @@ AST_TEST_DEFINE(threadpool_push)
case TEST_EXECUTE: case TEST_EXECUTE:
break; break;
} }
tld = test_alloc();
listener = ast_threadpool_listener_alloc(&test_callbacks); if (!tld) {
if (!listener) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
goto end; goto end;
@@ -325,6 +328,7 @@ end:
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(std); ast_free(std);
ast_free(tld);
return res; return res;
} }
@@ -353,11 +357,15 @@ AST_TEST_DEFINE(threadpool_initial_threads)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 3, &options); pool = ast_threadpool_create(info->name, listener, 3, &options);
if (!pool) { if (!pool) {
@@ -371,6 +379,7 @@ end:
ast_threadpool_shutdown(pool); ast_threadpool_shutdown(pool);
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(tld);
return res; return res;
} }
@@ -399,11 +408,15 @@ AST_TEST_DEFINE(threadpool_thread_creation)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -422,6 +435,7 @@ end:
ast_threadpool_shutdown(pool); ast_threadpool_shutdown(pool);
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(tld);
return res; return res;
} }
@@ -449,11 +463,15 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -481,6 +499,7 @@ end:
ast_threadpool_shutdown(pool); ast_threadpool_shutdown(pool);
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(tld);
return res; return res;
} }
@@ -508,11 +527,15 @@ AST_TEST_DEFINE(threadpool_thread_timeout)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -543,6 +566,7 @@ end:
ast_threadpool_shutdown(pool); ast_threadpool_shutdown(pool);
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(tld);
return res; return res;
} }
@@ -571,11 +595,15 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -619,6 +647,7 @@ end:
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(std); ast_free(std);
ast_free(tld);
return res; return res;
} }
@@ -648,11 +677,15 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -697,6 +730,7 @@ end:
} }
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(std); ast_free(std);
ast_free(tld);
return res; return res;
} }
@@ -727,11 +761,15 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -789,6 +827,7 @@ end:
ast_free(std1); ast_free(std1);
ast_free(std2); ast_free(std2);
ast_free(std3); ast_free(std3);
ast_free(tld);
return res; return res;
} }
@@ -822,11 +861,15 @@ AST_TEST_DEFINE(threadpool_auto_increment)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -901,6 +944,7 @@ end:
ast_free(std2); ast_free(std2);
ast_free(std3); ast_free(std3);
ast_free(std4); ast_free(std4);
ast_free(tld);
return res; return res;
} }
@@ -932,11 +976,15 @@ AST_TEST_DEFINE(threadpool_reactivation)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -1000,6 +1048,7 @@ end:
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(std1); ast_free(std1);
ast_free(std2); ast_free(std2);
ast_free(tld);
return res; return res;
} }
@@ -1094,11 +1143,15 @@ AST_TEST_DEFINE(threadpool_task_distribution)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -1153,6 +1206,7 @@ end:
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(ctd1); ast_free(ctd1);
ast_free(ctd2); ast_free(ctd2);
ast_free(tld);
return res; return res;
} }
@@ -1185,11 +1239,15 @@ AST_TEST_DEFINE(threadpool_more_destruction)
break; break;
} }
listener = ast_threadpool_listener_alloc(&test_callbacks); tld = test_alloc();
if (!listener) { if (!tld) {
return AST_TEST_FAIL; return AST_TEST_FAIL;
} }
tld = listener->private_data;
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, 0, &options); pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) { if (!pool) {
@@ -1259,6 +1317,7 @@ end:
ao2_cleanup(listener); ao2_cleanup(listener);
ast_free(ctd1); ast_free(ctd1);
ast_free(ctd2); ast_free(ctd2);
ast_free(tld);
return res; return res;
} }