mirror of
https://github.com/asterisk/asterisk.git
synced 2025-10-15 17:27:02 +00:00
taskpool: Add taskpool API, switch Stasis to using it.
This change introduces a new API called taskpool. This is a pool of taskprocessors. It provides the following functionality: 1. Task pushing to a pool of taskprocessors 2. Synchronous tasks 3. Serializers for execution ordering of tasks 4. Growing/shrinking of number of taskprocessors in pool This functionality already exists through the combination of threadpool+taskprocessors but through investigating I determined that this carries substantial overhead for short to medium duration tasks. The threadpool uses a single queue of work, and for management of threads it involves additional tasks. I wrote taskpool to eliminate the extra overhead and management as much as possible. Instead of a single queue of work each taskprocessor has its own queue and at push time a selector chooses the taskprocessor to queue the task to. Each taskprocessor also has its own thread like normal. This spreads out the tasks immediately and reduces contention on shared resources. Using the included efficiency tests the number of tasks that can be executed per second in a taskpool is 6-12 times more than an equivalent threadpool+taskprocessor setup. Stasis has been moved over to using this new API as it is a heavy consumer of threadpool+taskprocessors and produces a lot of tasks. UpgradeNote: The threadpool_* options in stasis.conf have now been deprecated though they continue to be read and used. They have been replaced with taskpool options that give greater control over the underlying taskpool used for stasis. DeveloperNote: The taskpool API has been added for common usage of a pool of taskprocessors. It is suggested to use this API instead of the threadpool+taskprocessor approach.
This commit is contained in:
committed by
github-actions[bot]
parent
390d8f98c2
commit
e9ee9d7d98
133
main/stasis.c
133
main/stasis.c
@@ -33,7 +33,7 @@
|
||||
#include "asterisk/stasis_internal.h"
|
||||
#include "asterisk/stasis.h"
|
||||
#include "asterisk/taskprocessor.h"
|
||||
#include "asterisk/threadpool.h"
|
||||
#include "asterisk/taskpool.h"
|
||||
#include "asterisk/utils.h"
|
||||
#include "asterisk/uuid.h"
|
||||
#include "asterisk/vector.h"
|
||||
@@ -95,6 +95,36 @@
|
||||
<synopsis>Maximum number of threads in the threadpool.</synopsis>
|
||||
</configOption>
|
||||
</configObject>
|
||||
<configObject name="taskpool">
|
||||
<since>
|
||||
<version>24.0.0</version>
|
||||
</since>
|
||||
<synopsis>Settings that configure the taskpool Stasis uses to deliver some messages.</synopsis>
|
||||
<configOption name="minimum_size" default="5">
|
||||
<since>
|
||||
<version>24.0.0</version>
|
||||
</since>
|
||||
<synopsis>Minimum number of taskprocessors in the message bus taskpool.</synopsis>
|
||||
</configOption>
|
||||
<configOption name="initial_size" default="5">
|
||||
<since>
|
||||
<version>24.0.0</version>
|
||||
</since>
|
||||
<synopsis>Initial number of taskprocessors in the message bus taskpool.</synopsis>
|
||||
</configOption>
|
||||
<configOption name="idle_timeout_sec" default="20">
|
||||
<since>
|
||||
<version>24.0.0</version>
|
||||
</since>
|
||||
<synopsis>Number of seconds before an idle taskprocessor is disposed of.</synopsis>
|
||||
</configOption>
|
||||
<configOption name="max_size" default="50">
|
||||
<since>
|
||||
<version>24.0.0</version>
|
||||
</since>
|
||||
<synopsis>Maximum number of taskprocessors in the taskpool.</synopsis>
|
||||
</configOption>
|
||||
</configObject>
|
||||
<configObject name="declined_message_types">
|
||||
<since>
|
||||
<version>13.0.0</version>
|
||||
@@ -330,8 +360,8 @@
|
||||
/*! The number of buckets to use for topic pools */
|
||||
#define TOPIC_POOL_BUCKETS 57
|
||||
|
||||
/*! Thread pool for topics that don't want a dedicated taskprocessor */
|
||||
static struct ast_threadpool *threadpool;
|
||||
/*! Taskpool for topics that don't want a dedicated taskprocessor */
|
||||
static struct ast_taskpool *taskpool;
|
||||
|
||||
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
|
||||
|
||||
@@ -692,8 +722,8 @@ struct stasis_subscription_statistics {
|
||||
int messages_passed;
|
||||
/*! \brief Using a mailbox to queue messages */
|
||||
int uses_mailbox;
|
||||
/*! \brief Using stasis threadpool for handling messages */
|
||||
int uses_threadpool;
|
||||
/*! \brief Using stasis taskpool for handling messages */
|
||||
int uses_taskpool;
|
||||
/*! \brief The line number where the subscription originates */
|
||||
int lineno;
|
||||
/*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
|
||||
@@ -846,7 +876,7 @@ static void subscription_statistics_destroy(void *obj)
|
||||
}
|
||||
|
||||
static struct stasis_subscription_statistics *stasis_subscription_statistics_create(struct stasis_subscription *sub,
|
||||
int needs_mailbox, int use_thread_pool, const char *file, int lineno,
|
||||
int needs_mailbox, int use_taskpool, const char *file, int lineno,
|
||||
const char *func)
|
||||
{
|
||||
struct stasis_subscription_statistics *statistics;
|
||||
@@ -871,7 +901,7 @@ static struct stasis_subscription_statistics *stasis_subscription_statistics_cre
|
||||
statistics->lineno = lineno;
|
||||
statistics->func = func;
|
||||
statistics->uses_mailbox = needs_mailbox;
|
||||
statistics->uses_threadpool = use_thread_pool;
|
||||
statistics->uses_taskpool = use_taskpool;
|
||||
strcpy(statistics->uniqueid, sub->uniqueid); /* SAFE */
|
||||
statistics->sub = sub;
|
||||
ao2_link(subscription_stats, statistics);
|
||||
@@ -885,7 +915,7 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
stasis_subscription_cb callback,
|
||||
void *data,
|
||||
int needs_mailbox,
|
||||
int use_thread_pool,
|
||||
int use_taskpool,
|
||||
const char *file,
|
||||
int lineno,
|
||||
const char *func)
|
||||
@@ -905,7 +935,7 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
|
||||
#ifdef AST_DEVMODE
|
||||
ret = ast_asprintf(&sub->uniqueid, "%s:%s-%d", file, stasis_topic_name(topic), ast_atomic_fetchadd_int(&topic->subscriber_id, +1));
|
||||
sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_thread_pool, file, lineno, func);
|
||||
sub->statistics = stasis_subscription_statistics_create(sub, needs_mailbox, use_taskpool, file, lineno, func);
|
||||
if (ret < 0 || !sub->statistics) {
|
||||
ao2_ref(sub, -1);
|
||||
return NULL;
|
||||
@@ -923,7 +953,7 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
|
||||
/* Create name with seq number appended. */
|
||||
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
|
||||
use_thread_pool ? 'p' : 'm',
|
||||
use_taskpool ? 'p' : 'm',
|
||||
stasis_topic_name(topic));
|
||||
|
||||
/*
|
||||
@@ -931,8 +961,8 @@ struct stasis_subscription *internal_stasis_subscribe(
|
||||
* acceptable. For a large number of subscribers, a thread
|
||||
* pool should be used.
|
||||
*/
|
||||
if (use_thread_pool) {
|
||||
sub->mailbox = ast_threadpool_serializer(tps_name, threadpool);
|
||||
if (use_taskpool) {
|
||||
sub->mailbox = ast_taskpool_serializer(tps_name, taskpool);
|
||||
} else {
|
||||
sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
|
||||
}
|
||||
@@ -2209,19 +2239,21 @@ struct stasis_declined_config {
|
||||
struct ao2_container *declined;
|
||||
};
|
||||
|
||||
/*! \brief Threadpool configuration options */
|
||||
struct stasis_threadpool_conf {
|
||||
/*! Initial size of the thread pool */
|
||||
/*! \brief Taskpool configuration options */
|
||||
struct stasis_taskpool_conf {
|
||||
/*! Minimum size of the taskpool */
|
||||
int minimum_size;
|
||||
/*! Initial size of the taskpool */
|
||||
int initial_size;
|
||||
/*! Time, in seconds, before we expire a thread */
|
||||
/*! Time, in seconds, before we expire a taskprocessor */
|
||||
int idle_timeout_sec;
|
||||
/*! Maximum number of thread to allow */
|
||||
/*! Maximum number of taskprocessors to allow */
|
||||
int max_size;
|
||||
};
|
||||
|
||||
struct stasis_config {
|
||||
/*! Thread pool configuration options */
|
||||
struct stasis_threadpool_conf *threadpool_options;
|
||||
/*! Taskpool configuration options */
|
||||
struct stasis_taskpool_conf *taskpool_options;
|
||||
/*! Declined message types */
|
||||
struct stasis_declined_config *declined_message_types;
|
||||
};
|
||||
@@ -2229,12 +2261,20 @@ struct stasis_config {
|
||||
static struct aco_type threadpool_option = {
|
||||
.type = ACO_GLOBAL,
|
||||
.name = "threadpool",
|
||||
.item_offset = offsetof(struct stasis_config, threadpool_options),
|
||||
.item_offset = offsetof(struct stasis_config, taskpool_options),
|
||||
.category = "threadpool",
|
||||
.category_match = ACO_WHITELIST_EXACT,
|
||||
};
|
||||
|
||||
static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
|
||||
static struct aco_type taskpool_option = {
|
||||
.type = ACO_GLOBAL,
|
||||
.name = "taskpool",
|
||||
.item_offset = offsetof(struct stasis_config, taskpool_options),
|
||||
.category = "taskpool",
|
||||
.category_match = ACO_WHITELIST_EXACT,
|
||||
};
|
||||
|
||||
static struct aco_type *taskpool_options[] = ACO_TYPES(&threadpool_option, &taskpool_option);
|
||||
|
||||
/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
|
||||
static struct aco_type declined_option = {
|
||||
@@ -2249,7 +2289,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
|
||||
|
||||
struct aco_file stasis_conf = {
|
||||
.filename = "stasis.conf",
|
||||
.types = ACO_TYPES(&declined_option, &threadpool_option),
|
||||
.types = ACO_TYPES(&declined_option, &threadpool_option, &taskpool_option),
|
||||
};
|
||||
|
||||
/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
|
||||
@@ -2274,7 +2314,7 @@ static void stasis_config_destructor(void *obj)
|
||||
struct stasis_config *cfg = obj;
|
||||
|
||||
ao2_cleanup(cfg->declined_message_types);
|
||||
ast_free(cfg->threadpool_options);
|
||||
ast_free(cfg->taskpool_options);
|
||||
}
|
||||
|
||||
static void *stasis_config_alloc(void)
|
||||
@@ -2285,8 +2325,8 @@ static void *stasis_config_alloc(void)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
|
||||
if (!cfg->threadpool_options) {
|
||||
cfg->taskpool_options = ast_calloc(1, sizeof(*cfg->taskpool_options));
|
||||
if (!cfg->taskpool_options) {
|
||||
ao2_ref(cfg, -1);
|
||||
return NULL;
|
||||
}
|
||||
@@ -2677,7 +2717,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
|
||||
ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped);
|
||||
ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed);
|
||||
ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No");
|
||||
ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No");
|
||||
ast_cli(a->fd, "Using stasis taskpool for handling messages: %s\n", statistics->uses_taskpool ? "Yes" : "No");
|
||||
ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked);
|
||||
ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked);
|
||||
|
||||
@@ -3077,8 +3117,8 @@ static void stasis_cleanup(void)
|
||||
ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
|
||||
ao2_cleanup(topic_all);
|
||||
topic_all = NULL;
|
||||
ast_threadpool_shutdown(threadpool);
|
||||
threadpool = NULL;
|
||||
ast_taskpool_shutdown(taskpool);
|
||||
taskpool = NULL;
|
||||
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
|
||||
STASIS_MESSAGE_TYPE_CLEANUP(ast_multi_user_event_type);
|
||||
aco_info_destroy(&cfg_info);
|
||||
@@ -3089,7 +3129,7 @@ int stasis_init(void)
|
||||
{
|
||||
struct stasis_config *cfg;
|
||||
int cache_init;
|
||||
struct ast_threadpool_options threadpool_opts = { 0, };
|
||||
struct ast_taskpool_options taskpool_opts = { 0, };
|
||||
#ifdef AST_DEVMODE
|
||||
struct ao2_container *subscription_stats;
|
||||
struct ao2_container *topic_stats;
|
||||
@@ -3104,17 +3144,21 @@ int stasis_init(void)
|
||||
|
||||
aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
|
||||
declined_options, "", declined_handler, 0);
|
||||
aco_option_register(&cfg_info, "minimum_size", ACO_EXACT,
|
||||
taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_taskpool_conf, minimum_size), 0,
|
||||
INT_MAX);
|
||||
aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
|
||||
threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_threadpool_conf, initial_size), 0,
|
||||
taskpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_taskpool_conf, initial_size), 0,
|
||||
INT_MAX);
|
||||
aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
|
||||
threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
|
||||
taskpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_taskpool_conf, idle_timeout_sec), 0,
|
||||
INT_MAX);
|
||||
aco_option_register(&cfg_info, "max_size", ACO_EXACT,
|
||||
threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_threadpool_conf, max_size), 0,
|
||||
taskpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
|
||||
FLDSET(struct stasis_taskpool_conf, max_size), 0,
|
||||
INT_MAX);
|
||||
|
||||
if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
|
||||
@@ -3124,7 +3168,7 @@ int stasis_init(void)
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
|
||||
if (aco_set_defaults(&taskpool_option, "taskpool", default_cfg->taskpool_options)) {
|
||||
ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
|
||||
ao2_ref(default_cfg, -1);
|
||||
|
||||
@@ -3150,15 +3194,16 @@ int stasis_init(void)
|
||||
}
|
||||
}
|
||||
|
||||
threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
|
||||
threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
|
||||
threadpool_opts.auto_increment = 1;
|
||||
threadpool_opts.max_size = cfg->threadpool_options->max_size;
|
||||
threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
|
||||
threadpool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
|
||||
taskpool_opts.version = AST_TASKPOOL_OPTIONS_VERSION;
|
||||
taskpool_opts.minimum_size = cfg->taskpool_options->minimum_size;
|
||||
taskpool_opts.initial_size = cfg->taskpool_options->initial_size;
|
||||
taskpool_opts.auto_increment = 1;
|
||||
taskpool_opts.max_size = cfg->taskpool_options->max_size;
|
||||
taskpool_opts.idle_timeout = cfg->taskpool_options->idle_timeout_sec;
|
||||
taskpool = ast_taskpool_create("stasis", &taskpool_opts);
|
||||
ao2_ref(cfg, -1);
|
||||
if (!threadpool) {
|
||||
ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
|
||||
if (!taskpool) {
|
||||
ast_log(LOG_ERROR, "Failed to create 'stasis-core' taskpool\n");
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
Reference in New Issue
Block a user