mwi: Update the MWI core to use stasis_state API

** Note **

This patch is meant to be the minimum needed in order for the MWI core to use
the now underlying stasis_state module. As such it does not completely remove
its reliance on the stasis_cache. Doing so has allowed current consumers to
not have to change, and update those code paths for this patch. When time
allows, subsequent patches can/will be made to those consumers to take advantage
of some of the new MWI API included here. Thus, eventually and ultimately
removing MWI dependency on the stasis_cache.

** End Note **

This patch makes it so the MWI core now takes advantage of the new stasis_state
API. Consumers of MWI should no longer need to depend upon stasis topic pooling,
and the stasis cache directly. Similar functionality and implementation details
have now been pushed into the stasis_state module. However, all MWI state should
be accessed via the MWI API itself.

As such a few new methods, and constructs have been added to the MWI core that
facilitate consumer publishing, subscribing, and iterating over MWI state data.

* ast_mwi_subscriber *

Created via ast_mwi_add_subscriber, a subscriber subscribes to a given mailbox
in order to receive updates about the given mailbox. Adding a subscriber will
create the underlying topic, and associated state data if those do not already
exist for it. The topic, and last known state data is guaranteed to exist for
the lifetime of the subscriber.

* ast_mwi_publisher *

Before publishing to a particular topic a publisher should be created. This can
be achieved by using ast_mwi_add_publisher. Publishing to a mailbox should then
be done using one of the MWI publish functions. This ensures the message is
published to the appropriate topic, and the last known state is maintained.

* ast_mwi_observer *

Add an observer in order to watch for particular MWI module related events. For
instance if a submodule needs to know when a subscription is added to any
mailbox an observer can be added to watch for that.

* other *

Urgent message count is now part of the published MWI state object. Also state
can be iterated over using defined callbacks.

ASTERISK-28442

Change-Id: I93f935f9090cd5ddff6d4bc80ff90703c05cf776
This commit is contained in:
Kevin Harwell
2019-06-11 14:12:12 -05:00
parent 83c6ebbae8
commit b31ac83900
3 changed files with 965 additions and 52 deletions

View File

@@ -22,16 +22,16 @@
#include "asterisk.h"
#include "asterisk/app.h"
#include "asterisk/mwi.h"
#include "asterisk/stasis_channels.h"
/*
* @{ \brief Define \ref stasis topic objects
*/
static struct stasis_topic *mwi_topic_all;
static struct stasis_state_manager *mwi_state_manager;
static struct stasis_cache *mwi_state_cache;
static struct stasis_caching_topic *mwi_topic_cached;
static struct stasis_topic_pool *mwi_topic_pool;
/* @} */
/*! \brief Convert a MWI \ref stasis_message to a \ref ast_event */
@@ -84,7 +84,7 @@ static void mwi_state_dtor(void *obj)
struct stasis_topic *ast_mwi_topic_all(void)
{
return mwi_topic_all;
return stasis_state_all_topic(mwi_state_manager);
}
struct stasis_cache *ast_mwi_state_cache(void)
@@ -99,10 +99,11 @@ struct stasis_topic *ast_mwi_topic_cached(void)
struct stasis_topic *ast_mwi_topic(const char *uniqueid)
{
return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid);
return stasis_state_topic(mwi_state_manager, uniqueid);
}
struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
static struct ast_mwi_state *mwi_create_state(const char *mailbox, const char *context,
int urgent_msgs, int new_msgs, int old_msgs)
{
struct ast_mwi_state *mwi_state;
@@ -110,10 +111,14 @@ struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
if (!mwi_state) {
ast_log(LOG_ERROR, "Unable to create MWI state for mailbox '%s@%s'\n",
mailbox, ast_strlen_zero(context) ? "" : context);
return NULL;
}
if (ast_string_field_init(mwi_state, 256)) {
ast_log(LOG_ERROR, "Unable to initialize MWI state for mailbox '%s@%s'\n",
mailbox, ast_strlen_zero(context) ? "" : context);
ao2_ref(mwi_state, -1);
return NULL;
}
@@ -123,9 +128,28 @@ struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
ast_string_field_set(mwi_state, uniqueid, mailbox);
}
mwi_state->urgent_msgs = urgent_msgs;
mwi_state->new_msgs = new_msgs;
mwi_state->old_msgs = old_msgs;
return mwi_state;
}
static struct ast_mwi_state *mwi_retrieve_then_create_state(const char *mailbox)
{
int urgent_msgs;
int new_msgs;
int old_msgs;
ast_app_inboxcount2(mailbox, &urgent_msgs, &new_msgs, &old_msgs);
return mwi_create_state(mailbox, NULL, urgent_msgs, new_msgs, old_msgs);
}
struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
{
return mwi_create_state(mailbox, context, 0, 0, 0);
}
/*!
* \internal
* \brief Create a MWI state snapshot message.
@@ -145,6 +169,7 @@ struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
static struct stasis_message *mwi_state_create_message(
const char *mailbox,
const char *context,
int urgent_msgs,
int new_msgs,
int old_msgs,
const char *channel_id,
@@ -157,14 +182,11 @@ static struct stasis_message *mwi_state_create_message(
return NULL;
}
mwi_state = ast_mwi_create(mailbox, context);
mwi_state = mwi_create_state(mailbox, context, urgent_msgs, new_msgs, old_msgs);
if (!mwi_state) {
return NULL;
}
mwi_state->new_msgs = new_msgs;
mwi_state->old_msgs = old_msgs;
if (!ast_strlen_zero(channel_id)) {
mwi_state->snapshot = ast_channel_snapshot_get_latest(channel_id);
}
@@ -186,6 +208,183 @@ static struct stasis_message *mwi_state_create_message(
return message;
}
/*!
* \internal
*
* This object currently acts as a typedef, but can also be thought of as a "child" object
* of the stasis_state_subscriber type. As such the "base" pointer should always be the
* first object attribute. Doing so allows this object to be easily type cast and used by
* the stasis_state code.
*/
struct ast_mwi_subscriber {
/*! The "base" state subscriber. (Must be first object attribute) */
struct stasis_state_subscriber *base;
};
struct ast_mwi_subscriber *ast_mwi_add_subscriber(const char *mailbox)
{
return (struct ast_mwi_subscriber *)stasis_state_add_subscriber(
mwi_state_manager, mailbox);
}
struct ast_mwi_subscriber *ast_mwi_subscribe_pool(const char *mailbox,
stasis_subscription_cb callback, void *data)
{
struct stasis_subscription *stasis_sub;
struct ast_mwi_subscriber *sub = (struct ast_mwi_subscriber *)stasis_state_subscribe_pool(
mwi_state_manager, mailbox, callback, data);
if (!sub) {
return NULL;
}
stasis_sub = ast_mwi_subscriber_subscription(sub);
stasis_subscription_accept_message_type(stasis_sub, ast_mwi_state_type());
stasis_subscription_set_filter(stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
return sub;
}
void *ast_mwi_unsubscribe(struct ast_mwi_subscriber *sub)
{
return stasis_state_unsubscribe((struct stasis_state_subscriber *)sub);
}
void *ast_mwi_unsubscribe_and_join(struct ast_mwi_subscriber *sub)
{
return stasis_state_unsubscribe_and_join((struct stasis_state_subscriber *)sub);
}
struct stasis_topic *ast_mwi_subscriber_topic(struct ast_mwi_subscriber *sub)
{
return stasis_state_subscriber_topic((struct stasis_state_subscriber *)sub);
}
struct ast_mwi_state *ast_mwi_subscriber_data(struct ast_mwi_subscriber *sub)
{
struct stasis_state_subscriber *s = (struct stasis_state_subscriber *)sub;
struct ast_mwi_state *mwi_state = stasis_state_subscriber_data(s);
return mwi_state ?: mwi_retrieve_then_create_state(stasis_state_subscriber_id(s));
}
struct stasis_subscription *ast_mwi_subscriber_subscription(struct ast_mwi_subscriber *sub)
{
return stasis_state_subscriber_subscription((struct stasis_state_subscriber *)sub);
}
/*!
* \internal
*
* This object currently acts as a typedef, but can also be thought of as a "child" object
* of the stasis_state_publisher type. As such the "base" pointer should always be the
* first object attribute. Doing so allows this object to be easily type cast and used by
* the stasis_state code.
*/
struct ast_mwi_publisher {
/*! The "base" state publisher. (Must be first object attribute) */
struct stasis_state_publisher *base;
};
struct ast_mwi_publisher *ast_mwi_add_publisher(const char *mailbox)
{
return (struct ast_mwi_publisher *)stasis_state_add_publisher(
mwi_state_manager, mailbox);
}
int ast_mwi_add_observer(struct ast_mwi_observer *observer)
{
return stasis_state_add_observer(mwi_state_manager,
(struct stasis_state_observer *)observer);
}
void ast_mwi_remove_observer(struct ast_mwi_observer *observer)
{
stasis_state_remove_observer(mwi_state_manager,
(struct stasis_state_observer *)observer);
}
struct mwi_handler_data {
on_mwi_state handler;
void *data;
};
static int handle_mwi_state(const char *id, struct stasis_message *msg, void *user_data)
{
struct mwi_handler_data *d = user_data;
struct ast_mwi_state *mwi_state = stasis_message_data(msg);
int res;
if (mwi_state) {
return d->handler(mwi_state, d->data);
}
mwi_state = mwi_create_state(id, NULL, 0, 0, 0);
if (!mwi_state) {
return 0;
}
res = d->handler(mwi_state, d->data);
ao2_ref(mwi_state, -1);
return res;
}
void ast_mwi_state_callback_all(on_mwi_state handler, void *data)
{
struct mwi_handler_data d = {
.handler = handler,
.data = data
};
stasis_state_callback_all(mwi_state_manager, handle_mwi_state, &d);
}
void ast_mwi_state_callback_subscribed(on_mwi_state handler, void *data)
{
struct mwi_handler_data d = {
.handler = handler,
.data = data
};
stasis_state_callback_subscribed(mwi_state_manager, handle_mwi_state, &d);
}
int ast_mwi_publish(struct ast_mwi_publisher *pub, int urgent_msgs,
int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
{
struct stasis_state_publisher *p = (struct stasis_state_publisher *)pub;
struct stasis_message *msg = mwi_state_create_message(stasis_state_publisher_id(p),
NULL, urgent_msgs, new_msgs, old_msgs, channel_id, eid);
if (!msg) {
return -1;
}
stasis_state_publish(p, msg);
ao2_ref(msg, -1);
return 0;
}
int ast_mwi_publish_by_mailbox(const char *mailbox, const char *context, int urgent_msgs,
int new_msgs, int old_msgs, const char *channel_id, struct ast_eid *eid)
{
struct ast_mwi_state *mwi_state;
struct stasis_message *msg = mwi_state_create_message(
mailbox, context, urgent_msgs, new_msgs, old_msgs, channel_id, eid);
if (!msg) {
return -1;
}
mwi_state = stasis_message_data(msg);
stasis_state_publish_by_id(mwi_state_manager, mwi_state->uniqueid, NULL, msg);
ao2_ref(msg, -1);
return 0;
}
int ast_publish_mwi_state_full(
const char *mailbox,
const char *context,
@@ -194,24 +393,7 @@ int ast_publish_mwi_state_full(
const char *channel_id,
struct ast_eid *eid)
{
struct ast_mwi_state *mwi_state;
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
struct stasis_topic *mailbox_specific_topic;
message = mwi_state_create_message(mailbox, context, new_msgs, old_msgs, channel_id, eid);
if (!message) {
return -1;
}
mwi_state = stasis_message_data(message);
mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
if (!mailbox_specific_topic) {
return -1;
}
stasis_publish(mailbox_specific_topic, message);
return 0;
return ast_mwi_publish_by_mailbox(mailbox, context, 0, new_msgs, old_msgs, channel_id, eid);
}
int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct ast_eid *eid)
@@ -220,9 +402,8 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a
struct stasis_message *cached_msg;
struct stasis_message *clear_msg;
struct ast_mwi_state *mwi_state;
struct stasis_topic *mailbox_specific_topic;
msg = mwi_state_create_message(mailbox, context, 0, 0, NULL, eid);
msg = mwi_state_create_message(mailbox, context, 0, 0, 0, NULL, eid);
if (!msg) {
return -1;
}
@@ -244,22 +425,16 @@ int ast_delete_mwi_state_full(const char *mailbox, const char *context, struct a
cached_msg = stasis_cache_get_by_eid(ast_mwi_state_cache(),
ast_mwi_state_type(), mwi_state->uniqueid, &ast_eid_default);
if (!cached_msg) {
/* Nothing to clear */
/* Nothing to clear from the cache, but still need to remove state */
stasis_state_remove_publish_by_id(mwi_state_manager, mwi_state->uniqueid, eid, NULL);
return -1;
}
ao2_cleanup(cached_msg);
mailbox_specific_topic = ast_mwi_topic(mwi_state->uniqueid);
if (!mailbox_specific_topic) {
return -1;
}
clear_msg = stasis_cache_clear_create(msg);
if (clear_msg) {
stasis_publish(mailbox_specific_topic, clear_msg);
}
stasis_state_remove_publish_by_id(mwi_state_manager, mwi_state->uniqueid, eid, clear_msg);
ao2_cleanup(clear_msg);
return 0;
}
@@ -315,13 +490,11 @@ struct stasis_message *ast_mwi_blob_create(struct ast_mwi_state *mwi_state,
static void mwi_cleanup(void)
{
ao2_cleanup(mwi_topic_pool);
mwi_topic_pool = NULL;
ao2_cleanup(mwi_topic_all);
mwi_topic_all = NULL;
ao2_cleanup(mwi_state_cache);
mwi_state_cache = NULL;
mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
ao2_cleanup(mwi_state_manager);
mwi_state_manager = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_state_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_vm_app_type);
}
@@ -338,8 +511,8 @@ int mwi_init(void)
return -1;
}
mwi_topic_all = stasis_topic_create("mwi:all");
if (!mwi_topic_all) {
mwi_state_manager = stasis_state_manager_create("mwi:all");
if (!mwi_state_manager) {
return -1;
}
@@ -348,15 +521,10 @@ int mwi_init(void)
return -1;
}
mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_cache);
mwi_topic_cached = stasis_caching_topic_create(ast_mwi_topic_all(), mwi_state_cache);
if (!mwi_topic_cached) {
return -1;
}
mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all);
if (!mwi_topic_pool) {
return -1;
}
return 0;
}