Media over Websocket Channel Driver

* Created chan_websocket which can exchange media over both inbound and
outbound websockets which the driver will frame and time.
See http://s.asterisk.net/mow for more information.

* res_http_websocket: Made defines for max message size public and converted
a few nuisance verbose messages to debugs.

* main/channel.c: Changed an obsolete nuisance error to a debug.

* ARI channels: Updated externalMedia to include chan_websocket as a supported
transport.

UserNote: A new channel driver "chan_websocket" is now available. It can
exchange media over both inbound and outbound websockets and will both frame
and re-time the media it receives.
See http://s.asterisk.net/mow for more information.

UserNote: The ARI channels/externalMedia API now includes support for the
WebSocket transport provided by chan_websocket.
This commit is contained in:
George Joseph
2025-04-28 10:39:50 -06:00
committed by github-actions[bot]
parent d5bd2b3ce9
commit 5963e624e2
8 changed files with 1672 additions and 54 deletions

1517
channels/chan_websocket.c Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -77,6 +77,14 @@ enum ast_websocket_opcode {
AST_WEBSOCKET_OPCODE_CONTINUATION = 0x0, /*!< Continuation of a previous frame */ AST_WEBSOCKET_OPCODE_CONTINUATION = 0x0, /*!< Continuation of a previous frame */
}; };
#ifdef LOW_MEMORY
/*! \brief Size of the pre-determined buffer for WebSocket frames */
#define AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE 8192
#else
/*! \brief Size of the pre-determined buffer for WebSocket frames */
#define AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE 65535
#endif
/*! /*!
* \brief Opaque structure for WebSocket server. * \brief Opaque structure for WebSocket server.
* \since 12 * \since 12

View File

@@ -3527,16 +3527,12 @@ static struct ast_frame *__ast_read(struct ast_channel *chan, int dropaudio, int
* The ast_waitfor() code records which of the channel's file * The ast_waitfor() code records which of the channel's file
* descriptors reported that data is available. In theory, * descriptors reported that data is available. In theory,
* ast_read() should only be called after ast_waitfor() reports * ast_read() should only be called after ast_waitfor() reports
* that a channel has data available for reading. However, * that a channel has data available for reading but certain
* there still may be some edge cases throughout the code where * situations with stasis and ARI could give a false indication.
* ast_read() is called improperly. This can potentially cause * For this reason, we don't stop any processing.
* problems, so if this is a developer build, make a lot of
* noise if this happens so that it can be addressed.
*
* One of the potential problems is blocking on a dead channel.
*/ */
if (ast_channel_fdno(chan) == -1) { if (ast_channel_fdno(chan) == -1) {
ast_log(LOG_ERROR, ast_debug(3,
"ast_read() on chan '%s' called with no recorded file descriptor.\n", "ast_read() on chan '%s' called with no recorded file descriptor.\n",
ast_channel_name(chan)); ast_channel_name(chan));
} }

View File

@@ -44,6 +44,7 @@
#include "asterisk/dial.h" #include "asterisk/dial.h"
#include "asterisk/max_forwards.h" #include "asterisk/max_forwards.h"
#include "asterisk/rtp_engine.h" #include "asterisk/rtp_engine.h"
#include "asterisk/websocket_client.h"
#include "resource_channels.h" #include "resource_channels.h"
#include <limits.h> #include <limits.h>
@@ -2179,6 +2180,53 @@ static int external_media_audiosocket_tcp(struct ast_ari_channels_external_media
return 0; return 0;
} }
static int external_media_websocket(struct ast_ari_channels_external_media_args *args,
struct ast_variable *variables,
struct ast_ari_response *response)
{
char *endpoint;
struct ast_channel *chan;
struct varshead *vars;
if (ast_asprintf(&endpoint, "WebSocket/%s/c(%s)",
args->external_host,
args->format) == -1) {
return 1;
}
chan = ari_channels_handle_originate_with_id(
endpoint,
NULL,
NULL,
0,
NULL,
args->app,
args->data,
NULL,
0,
variables,
args->channel_id,
NULL,
NULL,
args->format,
response);
ast_free(endpoint);
if (!chan) {
return 1;
}
ast_channel_lock(chan);
vars = ast_channel_varshead(chan);
if (vars && !AST_LIST_EMPTY(vars)) {
ast_json_object_set(response->message, "channelvars", ast_json_channel_vars(vars));
}
ast_channel_unlock(chan);
ast_channel_unref(chan);
return 0;
}
#include "asterisk/config.h" #include "asterisk/config.h"
#include "asterisk/netsock2.h" #include "asterisk/netsock2.h"
@@ -2209,15 +2257,70 @@ void ast_ari_channels_external_media(struct ast_variable *headers,
return; return;
} }
if (ast_strlen_zero(args->external_host)) { if (ast_strlen_zero(args->transport)) {
ast_ari_response_error(response, 400, "Bad Request", "external_host cannot be empty"); args->transport = "udp";
return;
} }
external_host = ast_strdupa(args->external_host); if (ast_strlen_zero(args->encapsulation)) {
if (!ast_sockaddr_split_hostport(external_host, &host, &port, PARSE_PORT_REQUIRE)) { args->encapsulation = "rtp";
ast_ari_response_error(response, 400, "Bad Request", "external_host must be <host>:<port>"); }
return; if (ast_strings_equal(args->transport, "websocket")) {
if (!ast_strings_equal(args->encapsulation, "none")) {
ast_ari_response_error(response, 400, "Bad Request", "encapsulation must be 'none' for websocket transport");
return;
}
}
if (ast_strings_equal(args->encapsulation, "rtp")) {
if (!ast_strings_equal(args->transport, "udp")) {
ast_ari_response_error(response, 400, "Bad Request", "transport must be 'udp' for rtp encapsulation");
return;
}
}
if (ast_strings_equal(args->encapsulation, "audiosocket")) {
if (!ast_strings_equal(args->transport, "tcp")) {
ast_ari_response_error(response, 400, "Bad Request", "transport must be 'tcp' for audiosocket encapsulation");
return;
}
}
if (ast_strlen_zero(args->connection_type)) {
args->connection_type = "client";
}
if (!ast_strings_equal(args->transport, "websocket")) {
if (ast_strings_equal(args->connection_type, "server")) {
ast_ari_response_error(response, 400, "Bad Request", "'server' connection_type can only be used with the websocket transport");
return;
}
}
if (ast_strlen_zero(args->external_host)) {
if (ast_strings_equal(args->connection_type, "client")) {
ast_ari_response_error(response, 400, "Bad Request", "external_host is required for all but websocket server connections");
return;
} else {
/* server is only valid for websocket, enforced above */
args->external_host = "INCOMING";
}
}
if (ast_strings_equal(args->transport, "websocket")) {
if (ast_strings_equal(args->connection_type, "client")) {
struct ast_websocket_client *ws_client =
ast_websocket_client_retrieve_by_id(args->external_host);
ao2_cleanup(ws_client);
if (!ws_client) {
ast_ari_response_error(response, 400, "Bad Request", "external_host must be a valid websocket_client connection id.");
return;
}
}
} else {
external_host = ast_strdupa(args->external_host);
if (!ast_sockaddr_split_hostport(external_host, &host, &port, PARSE_PORT_REQUIRE)) {
ast_ari_response_error(response, 400, "Bad Request", "external_host must be <host>:<port> for all transports other than websocket");
return;
}
} }
if (ast_strlen_zero(args->format)) { if (ast_strlen_zero(args->format)) {
@@ -2225,15 +2328,6 @@ void ast_ari_channels_external_media(struct ast_variable *headers,
return; return;
} }
if (ast_strlen_zero(args->encapsulation)) {
args->encapsulation = "rtp";
}
if (ast_strlen_zero(args->transport)) {
args->transport = "udp";
}
if (ast_strlen_zero(args->connection_type)) {
args->connection_type = "client";
}
if (ast_strlen_zero(args->direction)) { if (ast_strlen_zero(args->direction)) {
args->direction = "both"; args->direction = "both";
} }
@@ -2250,6 +2344,12 @@ void ast_ari_channels_external_media(struct ast_variable *headers,
response, 500, "Internal Server Error", response, 500, "Internal Server Error",
"An internal error prevented this request from being handled"); "An internal error prevented this request from being handled");
} }
} else if (strcasecmp(args->encapsulation, "none") == 0 && strcasecmp(args->transport, "websocket") == 0) {
if (external_media_websocket(args, variables, response)) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"An internal error prevented this request from being handled");
}
} else { } else {
ast_ari_response_error( ast_ari_response_error(
response, 501, "Not Implemented", response, 501, "Not Implemented",

View File

@@ -834,13 +834,13 @@ struct ast_ari_channels_external_media_args {
const char *app; const char *app;
/*! The "variables" key in the body object holds variable key/value pairs to set on the channel on creation. Other keys in the body object are interpreted as query parameters. Ex. { "endpoint": "SIP/Alice", "variables": { "CALLERID(name)": "Alice" } } */ /*! The "variables" key in the body object holds variable key/value pairs to set on the channel on creation. Other keys in the body object are interpreted as query parameters. Ex. { "endpoint": "SIP/Alice", "variables": { "CALLERID(name)": "Alice" } } */
struct ast_json *variables; struct ast_json *variables;
/*! Hostname/ip:port of external host */ /*! Hostname/ip:port or websocket_client connection ID of external host. May be empty for a websocket server connection. */
const char *external_host; const char *external_host;
/*! Payload encapsulation protocol */ /*! Payload encapsulation protocol. Must be 'none' for the websocket transport. */
const char *encapsulation; const char *encapsulation;
/*! Transport protocol */ /*! Transport protocol */
const char *transport; const char *transport;
/*! Connection type (client/server) */ /*! Connection type (client/server). 'server' is only valid for the websocket transport. */
const char *connection_type; const char *connection_type;
/*! Format to encode audio in */ /*! Format to encode audio in */
const char *format; const char *format;
@@ -863,7 +863,7 @@ int ast_ari_channels_external_media_parse_body(
/*! /*!
* \brief Start an External Media session. * \brief Start an External Media session.
* *
* Create a channel to an External Media source/sink. * Create a channel to an External Media source/sink. The combination of transport and encapsulation will select one of chan_rtp(udp/rtp), chan_audiosocket(tcp/audiosocket) or chan_websocket(websocket/none) channel drivers.
* *
* \param headers HTTP headers * \param headers HTTP headers
* \param args Swagger parameters * \param args Swagger parameters

View File

@@ -2299,7 +2299,7 @@ static void ast_ari_channels_record_cb(
case 501: /* Not Implemented */ case 501: /* Not Implemented */
case 400: /* Invalid parameters */ case 400: /* Invalid parameters */
case 404: /* Channel not found */ case 404: /* Channel not found */
case 409: /* Channel is not in a Stasis application; the channel is currently bridged with other hcannels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail */ case 409: /* Channel is not in a Stasis application; the channel is currently bridged with other channels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail */
case 422: /* The format specified is unknown on this system */ case 422: /* The format specified is unknown on this system */
is_valid = 1; is_valid = 1;
break; break;

View File

@@ -51,27 +51,21 @@
#define MAX_PROTOCOL_BUCKETS 7 #define MAX_PROTOCOL_BUCKETS 7
#ifdef LOW_MEMORY #ifdef LOW_MEMORY
/*! \brief Size of the pre-determined buffer for WebSocket frames */
#define MAXIMUM_FRAME_SIZE 8192
/*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a /*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a
* payload. * payload.
*/ */
#define DEFAULT_RECONSTRUCTION_CEILING 8192 #define DEFAULT_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE
/*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */ /*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */
#define MAXIMUM_RECONSTRUCTION_CEILING 8192 #define MAXIMUM_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE
#else #else
/*! \brief Size of the pre-determined buffer for WebSocket frames */
#define MAXIMUM_FRAME_SIZE 65535
/*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a /*! \brief Default reconstruction size for multi-frame payload reconstruction. If exceeded the next frame will start a
* payload. * payload.
*/ */
#define DEFAULT_RECONSTRUCTION_CEILING MAXIMUM_FRAME_SIZE #define DEFAULT_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE
/*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */ /*! \brief Maximum reconstruction size for multi-frame payload reconstruction. */
#define MAXIMUM_RECONSTRUCTION_CEILING MAXIMUM_FRAME_SIZE #define MAXIMUM_RECONSTRUCTION_CEILING AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE
#endif #endif
/*! \brief Maximum size of a websocket frame header /*! \brief Maximum size of a websocket frame header
@@ -100,7 +94,7 @@ struct ast_websocket {
struct websocket_client *client; /*!< Client object when connected as a client websocket */ struct websocket_client *client; /*!< Client object when connected as a client websocket */
char session_id[AST_UUID_STR_LEN]; /*!< The identifier for the websocket session */ char session_id[AST_UUID_STR_LEN]; /*!< The identifier for the websocket session */
uint16_t close_status_code; /*!< Status code sent in a CLOSE frame upon shutdown */ uint16_t close_status_code; /*!< Status code sent in a CLOSE frame upon shutdown */
char buf[MAXIMUM_FRAME_SIZE]; /*!< Fixed buffer for reading data into */ char buf[AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE]; /*!< Fixed buffer for reading data into */
}; };
const char *ast_websocket_type_to_str(enum ast_websocket_type type) const char *ast_websocket_type_to_str(enum ast_websocket_type type)
@@ -201,7 +195,7 @@ static void session_destroy_fn(void *obj)
if (session->stream) { if (session->stream) {
ast_iostream_close(session->stream); ast_iostream_close(session->stream);
session->stream = NULL; session->stream = NULL;
ast_verb(2, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from", ast_debug(3, "WebSocket connection %s '%s' closed\n", session->client ? "to" : "from",
ast_sockaddr_stringify(&session->remote_address)); ast_sockaddr_stringify(&session->remote_address));
} }
} }
@@ -279,7 +273,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_server_add_protocol2)(struct ast_websock
ao2_link_flags(server->protocols, protocol, OBJ_NOLOCK); ao2_link_flags(server->protocols, protocol, OBJ_NOLOCK);
ao2_unlock(server->protocols); ao2_unlock(server->protocols);
ast_verb(5, "WebSocket registered sub-protocol '%s'\n", protocol->name); ast_debug(1, "WebSocket registered sub-protocol '%s'\n", protocol->name);
ao2_ref(protocol, -1); ao2_ref(protocol, -1);
return 0; return 0;
@@ -301,7 +295,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_server_remove_protocol)(struct ast_webso
ao2_unlink(server->protocols, protocol); ao2_unlink(server->protocols, protocol);
ao2_ref(protocol, -1); ao2_ref(protocol, -1);
ast_verb(5, "WebSocket unregistered sub-protocol '%s'\n", name); ast_debug(1, "WebSocket unregistered sub-protocol '%s'\n", name);
return 0; return 0;
} }
@@ -672,7 +666,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_read)(struct ast_websocket *session, cha
/* Now read the rest of the payload */ /* Now read the rest of the payload */
*payload = &session->buf[frame_size]; /* payload will start here, at the end of the options, if any */ *payload = &session->buf[frame_size]; /* payload will start here, at the end of the options, if any */
frame_size = frame_size + (*payload_len); /* final frame size is header + optional headers + payload data */ frame_size = frame_size + (*payload_len); /* final frame size is header + optional headers + payload data */
if (frame_size > MAXIMUM_FRAME_SIZE) { if (frame_size > AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE) {
ast_log(LOG_WARNING, "Cannot fit huge websocket frame of %zu bytes\n", frame_size); ast_log(LOG_WARNING, "Cannot fit huge websocket frame of %zu bytes\n", frame_size);
/* The frame won't fit :-( */ /* The frame won't fit :-( */
ast_websocket_close(session, 1009); ast_websocket_close(session, 1009);
@@ -992,7 +986,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan
return 0; return 0;
} }
ast_verb(2, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version); ast_debug(3, "WebSocket connection from '%s' for protocol '%s' accepted using version '%d'\n", ast_sockaddr_stringify(&ser->remote_address), protocol ? : "", version);
/* Populate the session with all the needed details */ /* Populate the session with all the needed details */
session->stream = ser->stream; session->stream = ser->stream;

View File

@@ -1473,7 +1473,7 @@
}, },
{ {
"code": 409, "code": 409,
"reason": "Channel is not in a Stasis application; the channel is currently bridged with other hcannels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail" "reason": "Channel is not in a Stasis application; the channel is currently bridged with other channels; A recording with the same name already exists on the system and can not be overwritten because it is in progress or ifExists=fail"
}, },
{ {
"code": 422, "code": 422,
@@ -1870,7 +1870,7 @@
"17.1.0" "17.1.0"
], ],
"summary": "Start an External Media session.", "summary": "Start an External Media session.",
"notes": "Create a channel to an External Media source/sink.", "notes": "Create a channel to an External Media source/sink. The combination of transport and encapsulation will select one of chan_rtp(udp/rtp), chan_audiosocket(tcp/audiosocket) or chan_websocket(websocket/none) channel drivers.",
"nickname": "externalMedia", "nickname": "externalMedia",
"responseClass": "Channel", "responseClass": "Channel",
"parameters": [ "parameters": [
@@ -1900,15 +1900,15 @@
}, },
{ {
"name": "external_host", "name": "external_host",
"description": "Hostname/ip:port of external host", "description": "Hostname/ip:port or websocket_client connection ID of external host. May be empty for a websocket server connection.",
"paramType": "query", "paramType": "query",
"required": true, "required": false,
"allowMultiple": false, "allowMultiple": false,
"dataType": "string" "dataType": "string"
}, },
{ {
"name": "encapsulation", "name": "encapsulation",
"description": "Payload encapsulation protocol", "description": "Payload encapsulation protocol. Must be 'none' for the websocket transport.",
"paramType": "query", "paramType": "query",
"required": false, "required": false,
"allowMultiple": false, "allowMultiple": false,
@@ -1918,7 +1918,8 @@
"valueType": "LIST", "valueType": "LIST",
"values": [ "values": [
"rtp", "rtp",
"audiosocket" "audiosocket",
"none"
] ]
} }
}, },
@@ -1934,13 +1935,14 @@
"valueType": "LIST", "valueType": "LIST",
"values": [ "values": [
"udp", "udp",
"tcp" "tcp",
"websocket"
] ]
} }
}, },
{ {
"name": "connection_type", "name": "connection_type",
"description": "Connection type (client/server)", "description": "Connection type (client/server). 'server' is only valid for the websocket transport.",
"paramType": "query", "paramType": "query",
"required": false, "required": false,
"allowMultiple": false, "allowMultiple": false,
@@ -1949,7 +1951,8 @@
"allowableValues": { "allowableValues": {
"valueType": "LIST", "valueType": "LIST",
"values": [ "values": [
"client" "client",
"server"
] ]
} }
}, },