Prequisites for ARI Outbound Websockets

stasis:
* Added stasis_app_is_registered().
* Added stasis_app_control_mark_failed().
* Added stasis_app_control_is_failed().
* Fixed res_stasis_device_state so unsubscribe all works properly.
* Modified stasis_app_unregister() to unsubscribe from all event sources.
* Modified stasis_app_exec to return -1 if stasis_app_control_is_failed()
  returns true.

http:
* Added ast_http_create_basic_auth_header().

md5:
* Added define for MD5_DIGEST_LENGTH.

tcptls:
* Added flag to ast_tcptls_session_args to suppress connection log messages
  to give callers more control over logging.

http_websocket:
* Add flag to ast_websocket_client_options to suppress connection log messages
  to give callers more control over logging.
* Added username and password to ast_websocket_client_options to support
  outbound basic authentication.
* Added ast_websocket_result_to_str().

(cherry picked from commit cc92adc5fb)
This commit is contained in:
George Joseph
2025-04-16 13:40:52 -06:00
committed by Asterisk Development Team
parent 26d6a3da6a
commit a6dca5bf3a
12 changed files with 285 additions and 41 deletions

View File

@@ -1091,7 +1091,8 @@ int AST_OPTIONAL_API_NAME(ast_websocket_remove_protocol)(const char *name, ast_w
* The returned host will contain the address and optional port while
* path will contain everything after the address/port if included.
*/
static int websocket_client_parse_uri(const char *uri, char **host, struct ast_str **path)
static int websocket_client_parse_uri(const char *uri, char **host,
struct ast_str **path, char **userinfo)
{
struct ast_uri *parsed_uri = ast_uri_parse_websocket(uri);
@@ -1100,6 +1101,7 @@ static int websocket_client_parse_uri(const char *uri, char **host, struct ast_s
}
*host = ast_uri_make_host_with_port(parsed_uri);
*userinfo = ast_strdup(ast_uri_user_info(parsed_uri));
if (ast_uri_path(parsed_uri) || ast_uri_query(parsed_uri)) {
*path = ast_str_create(64);
@@ -1212,6 +1214,10 @@ struct websocket_client {
struct ast_tcptls_session_args *args;
/*! tcptls connection instance */
struct ast_tcptls_session_instance *ser;
/*! Authentication userid:password */
char *userinfo;
/*! Suppress connection log messages */
int suppress_connection_msgs;
};
static void websocket_client_destroy(void *obj)
@@ -1226,6 +1232,7 @@ static void websocket_client_destroy(void *obj)
ast_free(client->key);
ast_free(client->resource_name);
ast_free(client->host);
ast_free(client->userinfo);
}
static struct ast_websocket * websocket_client_create(
@@ -1239,9 +1246,17 @@ static struct ast_websocket * websocket_client_create(
return NULL;
}
if (!ast_uuid_generate_str(ws->session_id, sizeof(ws->session_id))) {
ast_log(LOG_ERROR, "Unable to allocate websocket session_id\n");
ao2_ref(ws, -1);
*result = WS_ALLOCATE_ERROR;
return NULL;
}
if (!(ws->client = ao2_alloc(
sizeof(*ws->client), websocket_client_destroy))) {
ast_log(LOG_ERROR, "Unable to allocate websocket client\n");
ao2_ref(ws, -1);
*result = WS_ALLOCATE_ERROR;
return NULL;
}
@@ -1253,22 +1268,34 @@ static struct ast_websocket * websocket_client_create(
}
if (websocket_client_parse_uri(
options->uri, &ws->client->host, &ws->client->resource_name)) {
options->uri, &ws->client->host, &ws->client->resource_name,
&ws->client->userinfo)) {
ao2_ref(ws, -1);
*result = WS_URI_PARSE_ERROR;
return NULL;
}
if (ast_strlen_zero(ws->client->userinfo)
&& !ast_strlen_zero(options->username)
&& !ast_strlen_zero(options->password)) {
ast_asprintf(&ws->client->userinfo, "%s:%s", options->username,
options->password);
}
if (!(ws->client->args = websocket_client_args_create(
ws->client->host, options->tls_cfg, result))) {
ao2_ref(ws, -1);
return NULL;
}
ws->client->protocols = ast_strdup(options->protocols);
ws->client->suppress_connection_msgs = options->suppress_connection_msgs;
ws->client->args->suppress_connection_msgs = options->suppress_connection_msgs;
ws->client->protocols = ast_strdup(options->protocols);
ws->client->version = 13;
ws->opcode = -1;
ws->reconstruct = DEFAULT_RECONSTRUCTION_CEILING;
ws->timeout = options->timeout;
return ws;
}
@@ -1289,17 +1316,29 @@ static enum ast_websocket_result websocket_client_handle_response_code(
case 101:
return 0;
case 400:
ast_log(LOG_ERROR, "Received response 400 - Bad Request "
"- from %s\n", client->host);
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Received response 400 - Bad Request "
"- from %s\n", client->host);
}
return WS_BAD_REQUEST;
case 401:
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Received response 401 - Unauthorized "
"- from %s\n", client->host);
}
return WS_UNAUTHORIZED;
case 404:
ast_log(LOG_ERROR, "Received response 404 - Request URL not "
"found - from %s\n", client->host);
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Received response 404 - Request URL not "
"found - from %s\n", client->host);
}
return WS_URL_NOT_FOUND;
}
ast_log(LOG_ERROR, "Invalid HTTP response code %d from %s\n",
response_code, client->host);
if (!client->suppress_connection_msgs) {
ast_log(LOG_ERROR, "Invalid HTTP response code %d from %s\n",
response_code, client->host);
}
return WS_INVALID_RESPONSE;
}
@@ -1384,29 +1423,49 @@ static enum ast_websocket_result websocket_client_handshake_get_response(
WS_OK : WS_HEADER_MISSING;
}
#define optional_header_spec "%s%s%s"
#define print_optional_header(test, name, value) \
test ? name : "", \
test ? value : "", \
test ? "\r\n" : ""
static enum ast_websocket_result websocket_client_handshake(
struct websocket_client *client)
{
char protocols[100] = "";
size_t protocols_len = 0;
struct ast_variable *auth_header = NULL;
size_t res;
if (!ast_strlen_zero(client->protocols)) {
sprintf(protocols, "Sec-WebSocket-Protocol: %s\r\n",
client->protocols);
if (!ast_strlen_zero(client->userinfo)) {
auth_header = ast_http_create_basic_auth_header(client->userinfo, NULL);
if (!auth_header) {
ast_log(LOG_ERROR, "Unable to allocate client websocket userinfo\n");
return WS_ALLOCATE_ERROR;
}
}
if (ast_iostream_printf(client->ser->stream,
"GET /%s HTTP/1.1\r\n"
"Sec-WebSocket-Version: %d\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Host: %s\r\n"
"Sec-WebSocket-Key: %s\r\n"
"%s\r\n",
client->resource_name ? ast_str_buffer(client->resource_name) : "",
client->version,
client->host,
client->key,
protocols) < 0) {
protocols_len = client->protocols ? strlen(client->protocols) : 0;
res = ast_iostream_printf(client->ser->stream,
"GET /%s HTTP/1.1\r\n"
"Sec-WebSocket-Version: %d\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Host: %s\r\n"
optional_header_spec
optional_header_spec
"Sec-WebSocket-Key: %s\r\n"
"\r\n",
client->resource_name ? ast_str_buffer(client->resource_name) : "",
client->version,
client->host,
print_optional_header(auth_header, "Authorization: ", auth_header->value),
print_optional_header(protocols_len, "Sec-WebSocket-Protocol: ", client->protocols),
client->key
);
ast_variables_destroy(auth_header);
if (res < 0) {
ast_log(LOG_ERROR, "Failed to send handshake.\n");
return WS_WRITE_ERROR;
}
@@ -1530,6 +1589,33 @@ int AST_OPTIONAL_API_NAME(ast_websocket_write_string)
(char *)buf, len);
}
const char *websocket_result_string_map[] = {
[WS_OK] = "OK",
[WS_ALLOCATE_ERROR] = "Allocation error",
[WS_KEY_ERROR] = "Key error",
[WS_URI_PARSE_ERROR] = "URI parse error",
[WS_URI_RESOLVE_ERROR] = "URI resolve error",
[WS_BAD_STATUS] = "Bad status line",
[WS_INVALID_RESPONSE] = "Invalid response code",
[WS_BAD_REQUEST] = "Bad request",
[WS_URL_NOT_FOUND] = "URL not found",
[WS_HEADER_MISMATCH] = "Header mismatch",
[WS_HEADER_MISSING] = "Header missing",
[WS_NOT_SUPPORTED] = "Not supported",
[WS_WRITE_ERROR] = "Write error",
[WS_CLIENT_START_ERROR] = "Client start error",
[WS_UNAUTHORIZED] = "Unauthorized"
};
const char *AST_OPTIONAL_API_NAME(ast_websocket_result_to_str)
(enum ast_websocket_result result)
{
if (!ARRAY_IN_BOUNDS(result, websocket_result_string_map)) {
return "unknown";
}
return websocket_result_string_map[result];
}
static int load_module(void)
{
websocketuri.data = websocket_server_internal_create();

View File

@@ -1667,6 +1667,9 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
*/
cleanup();
if (stasis_app_control_is_failed(control)) {
res = -1;
}
/* The control needs to be removed from the controls container in
* case a new PBX is started and ends up coming back into Stasis.
*/
@@ -1741,6 +1744,19 @@ struct stasis_app *stasis_app_get_by_name(const char *name)
return find_app_by_name(name);
}
int stasis_app_is_registered(const char *name)
{
struct stasis_app *app = find_app_by_name(name);
/*
* It's safe to unref app here because we're not actually
* using it or returning it.
*/
ao2_cleanup(app);
return app != NULL;
}
static int append_name(void *obj, void *arg, int flags)
{
struct stasis_app *app = obj;
@@ -1832,6 +1848,8 @@ int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *d
void stasis_app_unregister(const char *app_name)
{
struct stasis_app *app;
struct stasis_app_event_source *source;
int res;
if (!app_name) {
return;
@@ -1848,6 +1866,22 @@ void stasis_app_unregister(const char *app_name)
return;
}
/* Unsubscribe from all event sources. */
AST_RWLIST_RDLOCK(&event_sources);
AST_LIST_TRAVERSE(&event_sources, source, next) {
if (!source->unsubscribe || !source->is_subscribed
|| !source->is_subscribed(app, NULL)) {
continue;
}
res = source->unsubscribe(app, NULL);
if (res) {
ast_log(LOG_WARNING, "%s: Error unsubscribing from event source '%s'\n",
app_name, source->scheme);
}
}
AST_RWLIST_UNLOCK(&event_sources);
app_deactivate(app);
/* There's a decent chance that app is ready for cleanup. Go ahead

View File

@@ -351,7 +351,7 @@ static int is_subscribed_device_state_lock(struct stasis_app *app, const char *n
int is_subscribed;
ao2_lock(device_state_subscriptions);
is_subscribed = is_subscribed_device_state(app, name);
is_subscribed = is_subscribed_device_state(app, S_OR(name, DEVICE_STATE_ALL));
ao2_unlock(device_state_subscriptions);
return is_subscribed;
@@ -409,7 +409,7 @@ static int unsubscribe_device_state(struct stasis_app *app, const char *name)
struct device_state_subscription *sub;
ao2_lock(device_state_subscriptions);
sub = find_device_state_subscription(app, name);
sub = find_device_state_subscription(app, S_OR(name, DEVICE_STATE_ALL));
if (sub) {
remove_device_state_subscription(sub);
}

View File

@@ -102,6 +102,11 @@ struct stasis_app_control {
* When set, /c app_stasis should exit and continue in the dialplan.
*/
unsigned int is_done:1;
/*!
* When set, /c app_stasis should exit indicating failure and continue
* in the dialplan.
*/
unsigned int failed:1;
};
static void control_dtor(void *obj)
@@ -368,6 +373,17 @@ void control_mark_done(struct stasis_app_control *control)
ao2_unlock(control->command_queue);
}
void stasis_app_control_mark_failed(struct stasis_app_control *control)
{
control->failed = 1;
}
int stasis_app_control_is_failed(const struct stasis_app_control *control)
{
return control->failed;
}
struct stasis_app_control_continue_data {
char context[AST_MAX_CONTEXT];
char extension[AST_MAX_EXTENSION];