mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-04 11:58:52 +00:00
Add new threadpool test and fix some taskprocessor bugs.
The new thread creation test fails because Asterisk locks up while trying to lock a taskprocessor. While trying to debug that, I found a race condition during taskprocessor creation where a default taskprocessor listener could try to operate on a partially started taskprocessor. This was fixed by adding a new callback to taskprocessor listeners. Then while testing that change, I found some bugs in the taskprocessor tests where I was not properly unlocking when done with a lock. Scoped locks have spoiled me a bit. I still have not figured out why the threadpool thread creation test is locking up. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377368 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -84,6 +84,15 @@ struct ast_taskprocessor_listener_callbacks {
|
|||||||
* \retval non-NULL Allocated private data
|
* \retval non-NULL Allocated private data
|
||||||
*/
|
*/
|
||||||
void *(*alloc)(struct ast_taskprocessor_listener *listener);
|
void *(*alloc)(struct ast_taskprocessor_listener *listener);
|
||||||
|
/*!
|
||||||
|
* \brief The taskprocessor has started completely
|
||||||
|
*
|
||||||
|
* This indicates that the taskprocessor is fully set up and the listener
|
||||||
|
* can now start interacting with it.
|
||||||
|
*
|
||||||
|
* \param listener The listener to start
|
||||||
|
*/
|
||||||
|
int (*start)(struct ast_taskprocessor_listener *listener);
|
||||||
/*!
|
/*!
|
||||||
* \brief Indicates a task was pushed to the processor
|
* \brief Indicates a task was pushed to the processor
|
||||||
*
|
*
|
||||||
|
@@ -171,12 +171,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
|
|||||||
ast_cond_init(&pvt->cond, NULL);
|
ast_cond_init(&pvt->cond, NULL);
|
||||||
ast_mutex_init(&pvt->lock);
|
ast_mutex_init(&pvt->lock);
|
||||||
pvt->poll_thread = AST_PTHREADT_NULL;
|
pvt->poll_thread = AST_PTHREADT_NULL;
|
||||||
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
return pvt;
|
return pvt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int default_listener_start(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
||||||
|
|
||||||
|
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
||||||
{
|
{
|
||||||
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
||||||
@@ -209,6 +217,7 @@ static void default_listener_destroy(void *obj)
|
|||||||
|
|
||||||
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
||||||
.alloc = default_listener_alloc,
|
.alloc = default_listener_alloc,
|
||||||
|
.start = default_listener_start,
|
||||||
.task_pushed = default_task_pushed,
|
.task_pushed = default_task_pushed,
|
||||||
.emptied = default_emptied,
|
.emptied = default_emptied,
|
||||||
.shutdown = default_listener_shutdown,
|
.shutdown = default_listener_shutdown,
|
||||||
@@ -556,6 +565,12 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (p->listener->callbacks->start(p->listener)) {
|
||||||
|
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
|
||||||
|
ast_taskprocessor_unreference(p);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* RAII_VAR will decrement the refcount at the end of the function.
|
/* RAII_VAR will decrement the refcount at the end of the function.
|
||||||
* Since we want to pass back a reference to p, we bump the refcount
|
* Since we want to pass back a reference to p, we bump the refcount
|
||||||
*/
|
*/
|
||||||
|
@@ -268,6 +268,11 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
|
|||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief helper used for queued task when tasks are pushed
|
* \brief helper used for queued task when tasks are pushed
|
||||||
*/
|
*/
|
||||||
@@ -431,6 +436,7 @@ static void threadpool_destroy(void *private_data)
|
|||||||
*/
|
*/
|
||||||
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
|
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
|
||||||
.alloc = threadpool_alloc,
|
.alloc = threadpool_alloc,
|
||||||
|
.start = threadpool_tps_start,
|
||||||
.task_pushed = threadpool_tps_task_pushed,
|
.task_pushed = threadpool_tps_task_pushed,
|
||||||
.emptied = threadpool_tps_emptied,
|
.emptied = threadpool_tps_emptied,
|
||||||
.shutdown = threadpool_tps_shutdown,
|
.shutdown = threadpool_tps_shutdown,
|
||||||
@@ -623,6 +629,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
|
|||||||
|
|
||||||
pool = tps_listener->private_data;
|
pool = tps_listener->private_data;
|
||||||
pool->tps = tps;
|
pool->tps = tps;
|
||||||
|
ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps);
|
||||||
ao2_ref(listener, +1);
|
ao2_ref(listener, +1);
|
||||||
pool->listener = listener;
|
pool->listener = listener;
|
||||||
ast_threadpool_set_size(pool, initial_size);
|
ast_threadpool_set_size(pool, initial_size);
|
||||||
|
@@ -116,6 +116,7 @@ AST_TEST_DEFINE(default_taskprocessor)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ast_mutex_unlock(&task_data.lock);
|
||||||
|
|
||||||
if (!task_data.task_complete) {
|
if (!task_data.task_complete) {
|
||||||
ast_test_status_update(test, "Queued task did not execute!\n");
|
ast_test_status_update(test, "Queued task did not execute!\n");
|
||||||
@@ -218,6 +219,7 @@ AST_TEST_DEFINE(default_taskprocessor_load)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ast_mutex_unlock(&load_task_results.lock);
|
||||||
|
|
||||||
if (load_task_results.tasks_completed != NUM_TASKS) {
|
if (load_task_results.tasks_completed != NUM_TASKS) {
|
||||||
ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
|
ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
|
||||||
@@ -266,6 +268,14 @@ static void *test_alloc(struct ast_taskprocessor_listener *listener)
|
|||||||
return pvt;
|
return pvt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief test taskprocessor listener's start callback
|
||||||
|
*/
|
||||||
|
static int test_start(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief test taskprocessor listener's task_pushed callback
|
* \brief test taskprocessor listener's task_pushed callback
|
||||||
*
|
*
|
||||||
@@ -309,6 +319,7 @@ static void test_destroy(void *private_data)
|
|||||||
|
|
||||||
static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
|
static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
|
||||||
.alloc = test_alloc,
|
.alloc = test_alloc,
|
||||||
|
.start = test_start,
|
||||||
.task_pushed = test_task_pushed,
|
.task_pushed = test_task_pushed,
|
||||||
.emptied = test_emptied,
|
.emptied = test_emptied,
|
||||||
.shutdown = test_shutdown,
|
.shutdown = test_shutdown,
|
||||||
|
@@ -36,6 +36,7 @@
|
|||||||
#include "asterisk/module.h"
|
#include "asterisk/module.h"
|
||||||
#include "asterisk/lock.h"
|
#include "asterisk/lock.h"
|
||||||
#include "asterisk/astobj2.h"
|
#include "asterisk/astobj2.h"
|
||||||
|
#include "asterisk/logger.h"
|
||||||
|
|
||||||
struct test_listener_data {
|
struct test_listener_data {
|
||||||
int num_active;
|
int num_active;
|
||||||
@@ -66,6 +67,7 @@ static void test_state_changed(struct ast_threadpool *pool,
|
|||||||
{
|
{
|
||||||
struct test_listener_data *tld = listener->private_data;
|
struct test_listener_data *tld = listener->private_data;
|
||||||
SCOPED_MUTEX(lock, &tld->lock);
|
SCOPED_MUTEX(lock, &tld->lock);
|
||||||
|
ast_log(LOG_NOTICE, "State changed: num_active: %d, num_idle: %d\n", active_threads, idle_threads);
|
||||||
tld->num_active = active_threads;
|
tld->num_active = active_threads;
|
||||||
tld->num_idle = idle_threads;
|
tld->num_idle = idle_threads;
|
||||||
ast_cond_signal(&tld->cond);
|
ast_cond_signal(&tld->cond);
|
||||||
@@ -95,6 +97,7 @@ static void test_emptied(struct ast_threadpool *pool,
|
|||||||
static void test_destroy(void *private_data)
|
static void test_destroy(void *private_data)
|
||||||
{
|
{
|
||||||
struct test_listener_data *tld = private_data;
|
struct test_listener_data *tld = private_data;
|
||||||
|
ast_debug(1, "Poop\n");
|
||||||
ast_cond_destroy(&tld->cond);
|
ast_cond_destroy(&tld->cond);
|
||||||
ast_mutex_destroy(&tld->lock);
|
ast_mutex_destroy(&tld->lock);
|
||||||
ast_free(tld);
|
ast_free(tld);
|
||||||
@@ -135,6 +138,15 @@ static int simple_task(void *data)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define WAIT_WHILE(tld, condition) \
|
||||||
|
{\
|
||||||
|
ast_mutex_lock(&tld->lock);\
|
||||||
|
while ((condition)) {\
|
||||||
|
ast_cond_wait(&tld->cond, &tld->lock);\
|
||||||
|
}\
|
||||||
|
ast_mutex_unlock(&tld->lock);\
|
||||||
|
}\
|
||||||
|
|
||||||
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
|
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
|
||||||
{
|
{
|
||||||
struct test_listener_data *tld = listener->private_data;
|
struct test_listener_data *tld = listener->private_data;
|
||||||
@@ -246,15 +258,64 @@ end:
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AST_TEST_DEFINE(threadpool_thread_creation)
|
||||||
|
{
|
||||||
|
struct ast_threadpool *pool = NULL;
|
||||||
|
struct ast_threadpool_listener *listener = NULL;
|
||||||
|
enum ast_test_result_state res = AST_TEST_FAIL;
|
||||||
|
struct test_listener_data *tld;
|
||||||
|
|
||||||
|
switch (cmd) {
|
||||||
|
case TEST_INIT:
|
||||||
|
info->name = "threadpool_thread_creation";
|
||||||
|
info->category = "/main/threadpool_thread_creation/";
|
||||||
|
info->summary = "Test threadpool thread creation";
|
||||||
|
info->description =
|
||||||
|
"Ensure that threads can be added to a threadpool";
|
||||||
|
return AST_TEST_NOT_RUN;
|
||||||
|
case TEST_EXECUTE:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
listener = ast_threadpool_listener_alloc(&test_callbacks);
|
||||||
|
if (!listener) {
|
||||||
|
return AST_TEST_FAIL;
|
||||||
|
}
|
||||||
|
tld = listener->private_data;
|
||||||
|
|
||||||
|
pool = ast_threadpool_create(listener, 0);
|
||||||
|
if (!pool) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Now let's create a thread. It should start active, then go
|
||||||
|
* idle immediately
|
||||||
|
*/
|
||||||
|
ast_threadpool_set_size(pool, 1);
|
||||||
|
|
||||||
|
WAIT_WHILE(tld, tld->num_idle == 0);
|
||||||
|
|
||||||
|
res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
|
||||||
|
|
||||||
|
end:
|
||||||
|
if (pool) {
|
||||||
|
ast_threadpool_shutdown(pool);
|
||||||
|
}
|
||||||
|
ao2_cleanup(listener);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
static int unload_module(void)
|
static int unload_module(void)
|
||||||
{
|
{
|
||||||
ast_test_unregister(threadpool_push);
|
ast_test_unregister(threadpool_push);
|
||||||
|
ast_test_unregister(threadpool_thread_creation);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int load_module(void)
|
static int load_module(void)
|
||||||
{
|
{
|
||||||
ast_test_register(threadpool_push);
|
ast_test_register(threadpool_push);
|
||||||
|
ast_test_register(threadpool_thread_creation);
|
||||||
return AST_MODULE_LOAD_SUCCESS;
|
return AST_MODULE_LOAD_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user