mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-04 20:04:50 +00:00
Transition MWI to Stasis-core
Remove MWI's dependency on the event system by moving it to Stasis-core. This also introduces forwarding topic pools in Stasis-core which aggregate many dynamically allocated topics into a single primary topic. Review: https://reviewboard.asterisk.org/r/2368/ (closes issue ASTERISK-21097) Patch-by: Kinsey Moore git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -41,6 +41,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
||||
/*! Initial size of the subscribers list. */
|
||||
#define INITIAL_SUBSCRIBERS_MAX 4
|
||||
|
||||
/*! The number of buckets to use for topic pools */
|
||||
#define TOPIC_POOL_BUCKETS 57
|
||||
|
||||
/*! Threadpool for dispatching notifications to subscribers */
|
||||
static struct ast_threadpool *pool;
|
||||
|
||||
@@ -470,6 +473,96 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
|
||||
stasis_publish(topic, msg);
|
||||
}
|
||||
|
||||
struct topic_pool_entry {
|
||||
struct stasis_subscription *forward;
|
||||
struct stasis_topic *topic;
|
||||
};
|
||||
|
||||
static void topic_pool_entry_dtor(void *obj)
|
||||
{
|
||||
struct topic_pool_entry *entry = obj;
|
||||
entry->forward = stasis_unsubscribe(entry->forward);
|
||||
ao2_cleanup(entry->topic);
|
||||
entry->topic = NULL;
|
||||
}
|
||||
|
||||
static struct topic_pool_entry *topic_pool_entry_alloc(void)
|
||||
{
|
||||
return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
|
||||
}
|
||||
|
||||
struct stasis_topic_pool {
|
||||
struct ao2_container *pool_container;
|
||||
struct stasis_topic *pool_topic;
|
||||
};
|
||||
|
||||
static void topic_pool_dtor(void *obj)
|
||||
{
|
||||
struct stasis_topic_pool *pool = obj;
|
||||
ao2_cleanup(pool->pool_container);
|
||||
pool->pool_container = NULL;
|
||||
ao2_cleanup(pool->pool_topic);
|
||||
pool->pool_topic = NULL;
|
||||
}
|
||||
|
||||
static int topic_pool_entry_hash(const void *obj, const int flags)
|
||||
{
|
||||
const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
|
||||
return ast_str_case_hash(topic_name);
|
||||
}
|
||||
|
||||
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
|
||||
{
|
||||
struct topic_pool_entry *opt1 = obj, *opt2 = arg;
|
||||
const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
|
||||
return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
|
||||
}
|
||||
|
||||
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
|
||||
{
|
||||
RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
|
||||
if (!pool) {
|
||||
return NULL;
|
||||
}
|
||||
pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
|
||||
ao2_ref(pooled_topic, +1);
|
||||
pool->pool_topic = pooled_topic;
|
||||
|
||||
ao2_ref(pool, +1);
|
||||
return pool;
|
||||
}
|
||||
|
||||
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
|
||||
{
|
||||
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
|
||||
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
|
||||
topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
|
||||
|
||||
if (topic_pool_entry) {
|
||||
return topic_pool_entry->topic;
|
||||
}
|
||||
|
||||
topic_pool_entry = topic_pool_entry_alloc();
|
||||
|
||||
if (!topic_pool_entry) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
topic_pool_entry->topic = stasis_topic_create(topic_name);
|
||||
if (!topic_pool_entry->topic) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
|
||||
if (!topic_pool_entry->forward) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
|
||||
|
||||
return topic_pool_entry->topic;
|
||||
}
|
||||
|
||||
/*! \brief Cleanup function */
|
||||
static void stasis_exit(void)
|
||||
{
|
||||
|
Reference in New Issue
Block a user