ARI Outbound Websockets

Asterisk can now establish websocket sessions _to_ your ARI applications
as well as accepting websocket sessions _from_ them.
Full details: http://s.asterisk.net/ari-outbound-ws

Code change summary:
* Added an ast_vector_string_join() function,
* Added ApplicationRegistered and ApplicationUnregistered ARI events.
* Converted res/ari/config.c to use sorcery to process ari.conf.
* Added the "outbound-websocket" ARI config object.
* Refactored res/ari/ari_websockets.c to handle outbound websockets.
* Refactored res/ari/cli.c for the sorcery changeover.
* Updated res/res_stasis.c for the sorcery changeover.
* Updated apps/app_stasis.c to allow initiating per-call outbound websockets.
* Added CLI commands to manage ARI websockets.
* Added the new "outbound-websocket" object to ari.conf.sample.
* Moved the ARI XML documentation out of res_ari.c into res/ari/ari_doc.xml

UserNote: Asterisk can now establish websocket sessions _to_ your ARI applications
as well as accepting websocket sessions _from_ them.
Full details: http://s.asterisk.net/ari-outbound-ws

(cherry picked from commit 1c0d552155)
This commit is contained in:
George Joseph
2025-03-28 06:54:21 -06:00
parent 1aea2d50ae
commit d9c6ab1c99
15 changed files with 2948 additions and 963 deletions

View File

@@ -27,11 +27,13 @@
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/stasis_app.h"
#include "asterisk/uuid.h"
#include "internal.h"
#include "ari_websockets.h"
static char *ari_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ast_ari_conf *, conf, NULL, ao2_cleanup);
RAII_VAR(struct ari_conf_general *, general, NULL, ao2_cleanup);
switch (cmd) {
case CLI_INIT:
@@ -50,43 +52,42 @@ static char *ari_show(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
return CLI_SHOWUSAGE;
}
conf = ast_ari_config_get();
general = ari_conf_get_general();
if (!conf) {
if (!general) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
ast_cli(a->fd, "ARI Status:\n");
ast_cli(a->fd, "Enabled: %s\n", AST_CLI_YESNO(conf->general->enabled));
ast_cli(a->fd, "Enabled: %s\n", AST_CLI_YESNO(general->enabled));
ast_cli(a->fd, "Output format: ");
if (conf->general->format & AST_JSON_PRETTY) {
if (general->format & AST_JSON_PRETTY) {
ast_cli(a->fd, "pretty");
} else {
ast_cli(a->fd, "compact");
}
ast_cli(a->fd, "\n");
ast_cli(a->fd, "Auth realm: %s\n", conf->general->auth_realm);
ast_cli(a->fd, "Allowed Origins: %s\n", conf->general->allowed_origins);
ast_cli(a->fd, "User count: %d\n", ao2_container_count(conf->users));
ast_cli(a->fd, "Auth realm: %s\n", general->auth_realm);
ast_cli(a->fd, "Allowed Origins: %s\n", general->allowed_origins);
return CLI_SUCCESS;
}
static int show_users_cb(void *obj, void *arg, int flags)
{
struct ast_ari_conf_user *user = obj;
struct ari_conf_user *user = obj;
struct ast_cli_args *a = arg;
ast_cli(a->fd, "%-4s %s\n",
AST_CLI_YESNO(user->read_only),
user->username);
ast_sorcery_object_get_id(user));
return 0;
}
static char *ari_show_users(struct ast_cli_entry *e, int cmd,
struct ast_cli_args *a)
{
RAII_VAR(struct ast_ari_conf *, conf, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, users, NULL, ao2_cleanup);
switch (cmd) {
case CLI_INIT:
@@ -105,8 +106,8 @@ static char *ari_show_users(struct ast_cli_entry *e, int cmd,
return CLI_SHOWUSAGE;
}
conf = ast_ari_config_get();
if (!conf) {
users = ari_conf_get_users();
if (!users) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
@@ -114,63 +115,37 @@ static char *ari_show_users(struct ast_cli_entry *e, int cmd,
ast_cli(a->fd, "r/o? Username\n");
ast_cli(a->fd, "---- --------\n");
ao2_callback(conf->users, OBJ_NODATA, show_users_cb, a);
ao2_callback(users, OBJ_NODATA, show_users_cb, a);
return CLI_SUCCESS;
}
struct user_complete {
/*! Nth user to search for */
int state;
/*! Which user currently on */
int which;
};
static int complete_ari_user_search(void *obj, void *arg, void *data, int flags)
static void complete_sorcery_object(struct ao2_container *container,
const char *word)
{
struct user_complete *search = data;
size_t wordlen = strlen(word);
void *object;
struct ao2_iterator i = ao2_iterator_init(container, 0);
if (++search->which > search->state) {
return CMP_MATCH;
while ((object = ao2_iterator_next(&i))) {
const char *id = ast_sorcery_object_get_id(object);
if (!strncasecmp(word, id, wordlen)) {
ast_cli_completion_add(ast_strdup(id));
}
ao2_ref(object, -1);
}
return 0;
}
static char *complete_ari_user(struct ast_cli_args *a)
{
RAII_VAR(struct ast_ari_conf *, conf, NULL, ao2_cleanup);
RAII_VAR(struct ast_ari_conf_user *, user, NULL, ao2_cleanup);
struct user_complete search = {
.state = a->n,
};
conf = ast_ari_config_get();
if (!conf) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
user = ao2_callback_data(conf->users,
ast_strlen_zero(a->word) ? 0 : OBJ_PARTIAL_KEY,
complete_ari_user_search, (char*)a->word, &search);
return user ? ast_strdup(user->username) : NULL;
}
static char *complete_ari_show_user(struct ast_cli_args *a)
{
if (a->pos == 3) {
return complete_ari_user(a);
}
return NULL;
ao2_iterator_destroy(&i);
}
static char *ari_show_user(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ast_ari_conf *, conf, NULL, ao2_cleanup);
RAII_VAR(struct ast_ari_conf_user *, user, NULL, ao2_cleanup);
RAII_VAR(struct ari_conf_user *, user, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, users, ari_conf_get_users(), ao2_cleanup);
if (!users) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
switch (cmd) {
case CLI_INIT:
@@ -180,7 +155,8 @@ static char *ari_show_user(struct ast_cli_entry *e, int cmd, struct ast_cli_args
" Shows a specific ARI user\n";
return NULL;
case CLI_GENERATE:
return complete_ari_show_user(a);
complete_sorcery_object(users, a->word);
return NULL;
default:
break;
}
@@ -189,20 +165,13 @@ static char *ari_show_user(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_SHOWUSAGE;
}
conf = ast_ari_config_get();
if (!conf) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
user = ao2_find(conf->users, a->argv[3], OBJ_KEY);
user = ari_conf_get_user(a->argv[3]);
if (!user) {
ast_cli(a->fd, "User '%s' not found\n", a->argv[3]);
return CLI_SUCCESS;
}
ast_cli(a->fd, "Username: %s\n", user->username);
ast_cli(a->fd, "Username: %s\n", ast_sorcery_object_get_id(user));
ast_cli(a->fd, "Read only?: %s\n", AST_CLI_YESNO(user->read_only));
return CLI_SUCCESS;
@@ -281,7 +250,7 @@ static char *ari_show_apps(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ast_cli(a->fd, "=========================\n");
it_apps = ao2_iterator_init(apps, 0);
while ((app = ao2_iterator_next(&it_apps))) {
ast_cli(a->fd, "%-25.25s\n", app);
ast_cli(a->fd, "%s\n", app);
ao2_ref(app, -1);
}
@@ -291,55 +260,31 @@ static char *ari_show_apps(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_SUCCESS;
}
struct app_complete {
/*! Nth app to search for */
int state;
/*! Which app currently on */
int which;
};
static int complete_ari_app_search(void *obj, void *arg, void *data, int flags)
static void complete_app(struct ao2_container *container,
const char *word)
{
struct app_complete *search = data;
size_t wordlen = strlen(word);
void *object;
struct ao2_iterator i = ao2_iterator_init(container, 0);
if (++search->which > search->state) {
return CMP_MATCH;
while ((object = ao2_iterator_next(&i))) {
if (!strncasecmp(word, object, wordlen)) {
ast_cli_completion_add(ast_strdup(object));
}
ao2_ref(object, -1);
}
return 0;
}
static char *complete_ari_app(struct ast_cli_args *a, int include_all)
{
RAII_VAR(struct ao2_container *, apps, stasis_app_get_all(), ao2_cleanup);
RAII_VAR(char *, app, NULL, ao2_cleanup);
struct app_complete search = {
.state = a->n,
};
if (a->pos != 3) {
return NULL;
}
if (!apps) {
ast_cli(a->fd, "Error getting ARI applications\n");
return CLI_FAILURE;
}
if (include_all && ast_strlen_zero(a->word)) {
ast_str_container_add(apps, " all");
}
app = ao2_callback_data(apps,
ast_strlen_zero(a->word) ? 0 : OBJ_SEARCH_PARTIAL_KEY,
complete_ari_app_search, (char*)a->word, &search);
return app ? ast_strdup(app) : NULL;
ao2_iterator_destroy(&i);
}
static char *ari_show_app(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
void *app;
RAII_VAR(struct ao2_container *, apps, stasis_app_get_all(), ao2_cleanup);
if (!apps) {
ast_cli(a->fd, "Error getting ARI applications\n");
return CLI_FAILURE;
}
switch (cmd) {
case CLI_INIT:
@@ -350,7 +295,8 @@ static char *ari_show_app(struct ast_cli_entry *e, int cmd, struct ast_cli_args
;
return NULL;
case CLI_GENERATE:
return complete_ari_app(a, 0);
complete_app(apps, a->word);
return NULL;
default:
break;
}
@@ -373,9 +319,15 @@ static char *ari_show_app(struct ast_cli_entry *e, int cmd, struct ast_cli_args
static char *ari_set_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ao2_container *, apps, stasis_app_get_all(), ao2_cleanup);
void *app;
int debug;
if (!apps) {
ast_cli(a->fd, "Error getting ARI applications\n");
return CLI_FAILURE;
}
switch (cmd) {
case CLI_INIT:
e->command = "ari set debug";
@@ -385,7 +337,14 @@ static char *ari_set_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_args
;
return NULL;
case CLI_GENERATE:
return complete_ari_app(a, 1);
if (a->argc == 3) {
ast_cli_completion_add(ast_strdup("all"));
complete_app(apps, a->word);
} else if (a->argc == 4) {
ast_cli_completion_add(ast_strdup("on"));
ast_cli_completion_add(ast_strdup("off"));
}
return NULL;
default:
break;
}
@@ -418,6 +377,309 @@ static char *ari_set_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_SUCCESS;
}
static int show_owc_cb(void *obj, void *arg, int flags)
{
struct ari_conf_outbound_websocket *owc = obj;
const char *id = ast_sorcery_object_get_id(owc);
enum ari_conf_owc_fields invalid_fields = ari_conf_owc_get_invalid_fields(id);
struct ast_cli_args *a = arg;
ast_cli(a->fd, "%-32s %-15s %-32s %-7s %s\n",
id,
ari_websocket_type_to_str(owc->websocket_client->connection_type),
owc->apps,
invalid_fields == ARI_OWC_FIELD_NONE ? "valid" : "INVALID",
owc->websocket_client->uri);
return 0;
}
#define DASHES "----------------------------------------------------------------------"
static char *ari_show_owcs(struct ast_cli_entry *e, int cmd,
struct ast_cli_args *a)
{
RAII_VAR(struct ao2_container *, owcs, NULL, ao2_cleanup);
switch (cmd) {
case CLI_INIT:
e->command = "ari show outbound-websockets";
e->usage =
"Usage: ari show outbound-websockets\n"
" Shows all ARI outbound-websockets\n";
return NULL;
case CLI_GENERATE:
return NULL;
default:
break;
}
if (a->argc != 3) {
return CLI_SHOWUSAGE;
}
owcs = ari_conf_get_owcs();
if (!owcs) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
ast_cli(a->fd, "%-32s %-15s %-32s %-7s %s\n", "Name", "Type", "Apps", "Status", "URI");
ast_cli(a->fd, "%.*s %.*s %.*s %.*s %.*s\n", 32, DASHES, 15, DASHES, 32, DASHES, 7, DASHES, 64, DASHES);
ao2_callback(owcs, OBJ_NODATA, show_owc_cb, a);
return CLI_SUCCESS;
}
static char *ari_show_owc(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ari_conf_outbound_websocket *, owc, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, owcs, ari_conf_get_owcs(), ao2_cleanup);
const char *id = NULL;
enum ari_conf_owc_fields invalid_fields;
switch (cmd) {
case CLI_INIT:
e->command = "ari show outbound-websocket";
e->usage =
"Usage: ari show outbound-websocket <connection id>\n"
" Shows a specific ARI outbound websocket\n";
return NULL;
case CLI_GENERATE:
complete_sorcery_object(owcs, a->word);
return NULL;
default:
break;
}
if (a->argc != 4) {
return CLI_SHOWUSAGE;
}
owc = ari_conf_get_owc(a->argv[3]);
if (!owc) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
id = ast_sorcery_object_get_id(owc);
invalid_fields = ari_conf_owc_get_invalid_fields(id);
ast_cli(a->fd, "[%s] %s\n", id,
invalid_fields == ARI_OWC_FIELD_NONE ? "" : "**INVALID**");
ast_cli(a->fd, "uri = %s\n", owc->websocket_client->uri);
ast_cli(a->fd, "protocols = %s\n", owc->websocket_client->protocols);
ast_cli(a->fd, "apps = %s%s\n", owc->apps,
invalid_fields & ARI_OWC_FIELD_APPS ? " (invalid)" : "");
ast_cli(a->fd, "username = %s\n", owc->websocket_client->username);
ast_cli(a->fd, "password = %s\n", S_COR(owc->websocket_client->password, "********", ""));
ast_cli(a->fd, "local_ari_user = %s%s\n", owc->local_ari_user,
invalid_fields & ARI_OWC_FIELD_LOCAL_ARI_USER ? " (invalid)" : "");
ast_cli(a->fd, "connection_type = %s\n", ari_websocket_type_to_str(owc->websocket_client->connection_type));
ast_cli(a->fd, "subscribe_all = %s\n", AST_CLI_YESNO(owc->subscribe_all));
ast_cli(a->fd, "connec_timeout = %d\n", owc->websocket_client->connect_timeout);
ast_cli(a->fd, "reconnect_attempts = %d\n", owc->websocket_client->reconnect_attempts);
ast_cli(a->fd, "reconnect_interval = %d\n", owc->websocket_client->reconnect_interval);
ast_cli(a->fd, "tls_enabled = %s\n", AST_CLI_YESNO(owc->websocket_client->tls_enabled));
ast_cli(a->fd, "ca_list_file = %s\n", owc->websocket_client->ca_list_file);
ast_cli(a->fd, "ca_list_path = %s\n", owc->websocket_client->ca_list_path);
ast_cli(a->fd, "cert_file = %s\n", owc->websocket_client->cert_file);
ast_cli(a->fd, "priv_key_file = %s\n", owc->websocket_client->priv_key_file);
ast_cli(a->fd, "verify_server = %s\n", AST_CLI_YESNO(owc->websocket_client->verify_server_cert));
ast_cli(a->fd, "verify_server_hostname = %s\n", AST_CLI_YESNO(owc->websocket_client->verify_server_hostname));
ast_cli(a->fd, "\n");
return CLI_SUCCESS;
}
static char *ari_start_owc(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
RAII_VAR(struct ari_conf_outbound_websocket *, owc, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, owcs, ari_conf_get_owcs(), ao2_cleanup);
if (!owcs) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE ;
}
switch (cmd) {
case CLI_INIT:
e->command = "ari start outbound-websocket";
e->usage =
"Usage: ari start outbound-websocket <connection id>\n"
" Starts a specific ARI outbound websocket\n";
return NULL;
case CLI_GENERATE:
complete_sorcery_object(owcs, a->word);
return NULL;
default:
break;
}
if (a->argc != 4) {
return CLI_SHOWUSAGE;
}
owc = ari_conf_get_owc(a->argv[3]);
if (!owc) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE;
}
ast_cli(a->fd, "Starting websocket session for outbound-websocket '%s'\n", a->argv[3]);
if (ari_outbound_websocket_start(owc) != 0) {
ast_cli(a->fd, "Error starting outbound websocket\n");
return CLI_FAILURE ;
}
return CLI_SUCCESS;
}
static int show_sessions_cb(void *obj, void *arg, int flags)
{
struct ari_ws_session *session = obj;
struct ast_cli_args *a = arg;
char *apps = ast_vector_string_join(&session->websocket_apps, ",");
ast_cli(a->fd, "%-*s %-15s %-32s %-5s %s\n",
AST_UUID_STR_LEN,
session->session_id,
ari_websocket_type_to_str(session->type),
S_OR(session->remote_addr, "N/A"),
session->type == AST_WS_TYPE_CLIENT_PER_CALL_CONFIG
? "N/A" : (session->connected ? "Up" : "Down"),
S_OR(apps, ""));
ast_free(apps);
return 0;
}
#define DASHES "----------------------------------------------------------------------"
static char *ari_show_sessions(struct ast_cli_entry *e, int cmd,
struct ast_cli_args *a)
{
RAII_VAR(struct ao2_container *, sessions, NULL, ao2_cleanup);
switch (cmd) {
case CLI_INIT:
e->command = "ari show websocket sessions";
e->usage =
"Usage: ari show websocket sessions\n"
" Shows all ARI websocket sessions\n";
return NULL;
case CLI_GENERATE:
return NULL;
default:
break;
}
if (a->argc != 4) {
return CLI_SHOWUSAGE;
}
sessions = ari_websocket_get_sessions();
if (!sessions) {
ast_cli(a->fd, "Error getting websocket sessions\n");
return CLI_FAILURE;
}
ast_cli(a->fd, "%-*.*s %-15.15s %-32.32s %-5.5s %-16.16s\n",
AST_UUID_STR_LEN, AST_UUID_STR_LEN,
"Connection ID",
"Type",
"RemoteAddr",
"State",
"Apps"
);
ast_cli(a->fd, "%-*.*s %-15.15s %-32.32s %-5.5s %-16.16s\n",
AST_UUID_STR_LEN, AST_UUID_STR_LEN, DASHES, DASHES, DASHES, DASHES, DASHES);
ao2_callback(sessions, OBJ_NODATA, show_sessions_cb, a);
return CLI_SUCCESS;
}
static char *ari_shut_sessions(struct ast_cli_entry *e, int cmd,
struct ast_cli_args *a)
{
switch (cmd) {
case CLI_INIT:
e->command = "ari shutdown websocket sessions";
e->usage =
"Usage: ari shutdown websocket sessions\n"
" Shuts down all ARI websocket sessions\n";
return NULL;
case CLI_GENERATE:
return NULL;
default:
break;
}
if (a->argc != 4) {
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "Shutting down all websocket sessions\n");
ari_websocket_shutdown_all();
return CLI_SUCCESS;
}
static void complete_session(struct ao2_container *container,
const char *word)
{
size_t wordlen = strlen(word);
struct ari_ws_session *session;
struct ao2_iterator i = ao2_iterator_init(container, 0);
while ((session = ao2_iterator_next(&i))) {
if (!strncasecmp(word, session->session_id, wordlen)) {
ast_cli_completion_add(ast_strdup(session->session_id));
}
ao2_ref(session, -1);
}
ao2_iterator_destroy(&i);
}
static char *ari_shut_session(struct ast_cli_entry *e, int cmd,
struct ast_cli_args *a)
{
RAII_VAR(struct ari_ws_session *, session, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, sessions, ari_websocket_get_sessions(), ao2_cleanup);
if (!sessions) {
ast_cli(a->fd, "Error getting ARI configuration\n");
return CLI_FAILURE ;
}
switch (cmd) {
case CLI_INIT:
e->command = "ari shutdown websocket session";
e->usage =
"Usage: ari shutdown websocket session <id>\n"
" Shuts down ARI websocket session\n";
return NULL;
case CLI_GENERATE:
complete_session(sessions, a->word);
return NULL;
default:
break;
}
if (a->argc != 5) {
return CLI_SHOWUSAGE;
}
session = ari_websocket_get_session(a->argv[4]);
if (!session) {
ast_cli(a->fd, "Websocket session '%s' not found\n", a->argv[4]);
return CLI_FAILURE ;
}
ast_cli(a->fd, "Shutting down websocket session '%s'\n", a->argv[4]);
ari_websocket_shutdown(session);
return CLI_SUCCESS;
}
static struct ast_cli_entry cli_ari[] = {
AST_CLI_DEFINE(ari_show, "Show ARI settings"),
AST_CLI_DEFINE(ari_show_users, "List ARI users"),
@@ -426,12 +688,18 @@ static struct ast_cli_entry cli_ari[] = {
AST_CLI_DEFINE(ari_show_apps, "List registered ARI applications"),
AST_CLI_DEFINE(ari_show_app, "Display details of a registered ARI application"),
AST_CLI_DEFINE(ari_set_debug, "Enable/disable debugging of an ARI application"),
AST_CLI_DEFINE(ari_show_owcs, "List outbound websocket connections"),
AST_CLI_DEFINE(ari_show_owc, "Show outbound websocket connection"),
AST_CLI_DEFINE(ari_start_owc, "Start outbound websocket connection"),
AST_CLI_DEFINE(ari_show_sessions, "Show websocket sessions"),
AST_CLI_DEFINE(ari_shut_session, "Shutdown websocket session"),
AST_CLI_DEFINE(ari_shut_sessions, "Shutdown websocket sessions"),
};
int ast_ari_cli_register(void) {
int ari_cli_register(void) {
return ast_cli_register_multiple(cli_ari, ARRAY_LEN(cli_ari));
}
void ast_ari_cli_unregister(void) {
void ari_cli_unregister(void) {
ast_cli_unregister_multiple(cli_ari, ARRAY_LEN(cli_ari));
}