serializer: move/add asterisk serializer pool functionality

Serializer pools have previously existed in Asterisk. However, for the most
part the code has been duplicated across modules. This patch abstracts the
code into an 'ast_serializer_pool' object. As well the code is now centralized
in serializer.c/h.

In addition serializer pools can now optionally be monitored by a shutdown
group. This will prevent the pool from being destroyed until all serializers
have completed.

Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971
This commit is contained in:
Kevin Harwell
2019-10-02 11:24:21 -05:00
parent 6ef806ca60
commit afc10c25ac
5 changed files with 371 additions and 1 deletions

View File

@@ -35,6 +35,8 @@
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
/*!
* \brief userdata associated with baseline taskprocessor test
@@ -889,6 +891,78 @@ AST_TEST_DEFINE(taskprocessor_push_local)
return AST_TEST_PASS;
}
/*!
* \brief Baseline test for a serializer pool
*
* This test ensures that when a task is added to a taskprocessor that
* has been allocated with a default listener that the task gets executed
* as expected
*/
AST_TEST_DEFINE(serializer_pool)
{
RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.auto_increment = 0,
.initial_size = 1,
.max_size = 0,
};
/* struct ast_taskprocessor *tps; */
switch (cmd) {
case TEST_INIT:
info->name = "serializer_pool";
info->category = "/main/taskprocessor/";
info->summary = "Test using a serializer pool";
info->description =
"Ensures that a queued task gets executed.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
"test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
ast_test_validate(test, task_data = task_data_create());
task_data->wait_time = 4000; /* task takes 4 seconds */
ast_test_validate(test, !ast_taskprocessor_push(
ast_serializer_pool_get(serializer_pool), task, task_data));
if (!ast_serializer_pool_destroy(serializer_pool)) {
ast_test_status_update(test, "Unexpected pool destruction!\n");
/*
* The pool should have timed out, so if it destruction reports success
* we need to fail.
*/
serializer_pool = NULL;
return AST_TEST_FAIL;
}
ast_test_validate(test, !task_wait(task_data));
/* The first attempt should have failed. Second try should destroy successfully */
if (ast_serializer_pool_destroy(serializer_pool)) {
ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
/*
* If this fails we'll try again on return to hopefully avoid a memory leak.
* If it again times out a third time, well not much we can do.
*/
return AST_TEST_FAIL;
}
/* Test passed, so set pool to NULL to avoid "re-running" destroy */
serializer_pool = NULL;
return AST_TEST_PASS;
}
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
@@ -897,6 +971,7 @@ static int unload_module(void)
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
ast_test_unregister(serializer_pool);
return 0;
}
@@ -908,6 +983,7 @@ static int load_module(void)
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
ast_test_register(serializer_pool);
return AST_MODULE_LOAD_SUCCESS;
}