stasis / manager / ari: Better filter messages.

Previously both AMI and ARI used a default route on
their stasis message router to handle some of the
messages for publishing out their respective
connection. This caused messages to be given to
their subscription that could not be formatted
into AMI or JSON.

This change adds an API call to the stasis message
router which allows a default route to be set as well
as formatters that the default route is expecting.
This allows both AMI and ARI to specify that their
default route only wants messages of their given
formatter. By doing so stasis can more intelligently
filter at publishing time so that they do not receive
messages which will not be turned into AMI or JSON.

ASTERISK-28244

Change-Id: I65272819a53ce99f869181d1d370da559a7d1703
This commit is contained in:
Joshua C. Colp
2019-01-10 15:34:32 -04:00
parent 2b8602e8cf
commit 1323730f6c
5 changed files with 68 additions and 16 deletions

View File

@@ -317,16 +317,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message
ast_channel_unref(chan);
}
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app);
}
}
static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
struct ast_json *json;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app);
}
/* The dial type can be converted to JSON so it will always be passed
* here.
*/
if (stasis_message_type(message) == ast_channel_dial_type()) {
call_forwarded_handler(app, message);
}
@@ -803,7 +812,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript
}
}
static void bridge_default_handler(void *data, struct stasis_subscription *sub,
static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
@@ -930,8 +939,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add(app->bridge_router,
ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
res |= stasis_message_router_set_default(app->bridge_router,
bridge_default_handler, app);
res |= stasis_message_router_add(app->bridge_router,
stasis_subscription_change_type(), bridge_subscription_change_handler, app);
if (res != 0) {
return NULL;
@@ -953,8 +962,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add_cache_update(app->router,
ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
res |= stasis_message_router_set_default(app->router,
sub_default_handler, app);
res |= stasis_message_router_add(app->router,
stasis_subscription_change_type(), sub_subscription_change_handler, app);
stasis_message_router_set_formatters_default(app->router,
sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
if (res != 0) {
return NULL;