Add a task distribution test.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377418 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Mark Michelson
2012-12-08 00:22:33 +00:00
parent c75b8c5283
commit 4deecdef72

View File

@@ -703,6 +703,145 @@ end:
}
struct complex_task_data {
int task_executed;
int continue_task;
ast_mutex_t lock;
ast_cond_t stall_cond;
ast_cond_t done_cond;
};
static struct complex_task_data *complex_task_data_alloc(void)
{
struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
if (!ctd) {
return NULL;
}
ast_mutex_init(&ctd->lock);
ast_cond_init(&ctd->stall_cond, NULL);
ast_cond_init(&ctd->done_cond, NULL);
return ctd;
}
static int complex_task(void *data)
{
struct complex_task_data *ctd = data;
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->continue_task) {
ast_cond_wait(&ctd->stall_cond, lock);
}
/* We got poked. Finish up */
ctd->task_executed = 1;
ast_cond_signal(&ctd->done_cond);
return 0;
}
static void poke_worker(struct complex_task_data *ctd)
{
SCOPED_MUTEX(lock, &ctd->lock);
ctd->continue_task = 1;
ast_cond_signal(&ctd->stall_cond);
}
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 5,
.tv_nsec = start.tv_usec * 1000
};
enum ast_test_result_state res = AST_TEST_PASS;
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_executed) {
ast_cond_timedwait(&ctd->done_cond, lock, &end);
}
if (!ctd->task_executed) {
res = AST_TEST_FAIL;
}
return res;
}
AST_TEST_DEFINE(threadpool_task_distribution)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct complex_task_data *ctd1 = NULL;
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
switch (cmd) {
case TEST_INIT:
info->name = "threadpool_task_distribution";
info->category = "/main/threadpool/";
info->summary = "Test that tasks are evenly distributed to threads";
info->description =
"Push two tasks into a threadpool. Ensure that each is handled by\n"
"a separate thread\n";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
listener = ast_threadpool_listener_alloc(&test_callbacks);
if (!listener) {
return AST_TEST_FAIL;
}
tld = listener->private_data;
pool = ast_threadpool_create(listener, 0);
if (!pool) {
goto end;
}
ctd1 = complex_task_data_alloc();
ctd2 = complex_task_data_alloc();
if (!ctd1 || !ctd2) {
goto end;
}
ast_threadpool_push(pool, complex_task, ctd1);
ast_threadpool_push(pool, complex_task, ctd2);
ast_threadpool_set_size(pool, 2);
WAIT_WHILE(tld, tld->num_active < 2);
res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
/* The tasks are stalled until we poke them */
poke_worker(ctd1);
poke_worker(ctd2);
res = wait_for_complex_completion(ctd1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_complex_completion(ctd2);
if (res == AST_TEST_FAIL) {
goto end;
}
WAIT_WHILE(tld, tld->num_idle < 2);
res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
end:
if (pool) {
ast_threadpool_shutdown(pool);
}
ao2_cleanup(listener);
ast_free(ctd1);
ast_free(ctd2);
return res;
}
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
@@ -712,6 +851,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
return 0;
}
@@ -724,6 +864,7 @@ static int load_module(void)
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
return AST_MODULE_LOAD_SUCCESS;
}