|
|
|
@@ -72,6 +72,44 @@
|
|
|
|
|
</para>
|
|
|
|
|
</description>
|
|
|
|
|
</manager>
|
|
|
|
|
<configInfo name="res_pjsip_pubsub" language="en_US">
|
|
|
|
|
<synopsis>Module that implements publish and subscribe support.</synopsis>
|
|
|
|
|
<configFile name="pjsip.conf">
|
|
|
|
|
<configObject name="subscription_persistence">
|
|
|
|
|
<synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
|
|
|
|
|
<configOption name="packet">
|
|
|
|
|
<synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="src_name">
|
|
|
|
|
<synopsis>The source address of the subscription</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="src_port">
|
|
|
|
|
<synopsis>The source port of the subscription</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="transport_key">
|
|
|
|
|
<synopsis>The type of transport the subscription was received on</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="local_name">
|
|
|
|
|
<synopsis>The local address the subscription was received on</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="local_port">
|
|
|
|
|
<synopsis>The local port the subscription was received on</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="cseq">
|
|
|
|
|
<synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="tag">
|
|
|
|
|
<synopsis>The local tag of the dialog for the subscription</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="endpoint">
|
|
|
|
|
<synopsis>The name of the endpoint that subscribed</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
<configOption name="expires">
|
|
|
|
|
<synopsis>The time at which the subscription expires</synopsis>
|
|
|
|
|
</configOption>
|
|
|
|
|
</configObject>
|
|
|
|
|
</configFile>
|
|
|
|
|
</configInfo>
|
|
|
|
|
***/
|
|
|
|
|
|
|
|
|
|
static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
|
|
|
|
@@ -83,6 +121,7 @@ static struct pjsip_module pubsub_module = {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
#define MOD_DATA_BODY_GENERATOR "sub_body_generator"
|
|
|
|
|
#define MOD_DATA_PERSISTENCE "sub_persistence"
|
|
|
|
|
|
|
|
|
|
static const pj_str_t str_event_name = { "Event", 5 };
|
|
|
|
|
|
|
|
|
@@ -180,6 +219,35 @@ struct ast_sip_publication {
|
|
|
|
|
int sched_id;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \brief Structure used for persisting an inbound subscription
|
|
|
|
|
*/
|
|
|
|
|
struct subscription_persistence {
|
|
|
|
|
/*! Sorcery object details */
|
|
|
|
|
SORCERY_OBJECT(details);
|
|
|
|
|
/*! The name of the endpoint involved in the subscrption */
|
|
|
|
|
char *endpoint;
|
|
|
|
|
/*! SIP message that creates the subscription */
|
|
|
|
|
char packet[PJSIP_MAX_PKT_LEN];
|
|
|
|
|
/*! Source address of the message */
|
|
|
|
|
char src_name[PJ_INET6_ADDRSTRLEN];
|
|
|
|
|
/*! Source port of the message */
|
|
|
|
|
int src_port;
|
|
|
|
|
/*! Local transport key type */
|
|
|
|
|
char transport_key[32];
|
|
|
|
|
/*! Local transport address */
|
|
|
|
|
char local_name[PJ_INET6_ADDRSTRLEN];
|
|
|
|
|
/*! Local transport port */
|
|
|
|
|
int local_port;
|
|
|
|
|
/*! Next CSeq to use for message */
|
|
|
|
|
unsigned int cseq;
|
|
|
|
|
/*! Local tag of the dialog */
|
|
|
|
|
char *tag;
|
|
|
|
|
/*! When this subscription expires */
|
|
|
|
|
struct timeval expires;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \brief Structure representing a SIP subscription
|
|
|
|
|
*/
|
|
|
|
@@ -200,6 +268,8 @@ struct ast_sip_subscription {
|
|
|
|
|
pjsip_dialog *dlg;
|
|
|
|
|
/*! Body generaator for NOTIFYs */
|
|
|
|
|
struct ast_sip_pubsub_body_generator *body_generator;
|
|
|
|
|
/*! Persistence information */
|
|
|
|
|
struct subscription_persistence *persistence;
|
|
|
|
|
/*! Next item in the list */
|
|
|
|
|
AST_LIST_ENTRY(ast_sip_subscription) next;
|
|
|
|
|
};
|
|
|
|
@@ -214,6 +284,265 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
|
|
|
|
|
AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
|
|
|
|
|
AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
|
|
|
|
|
|
|
|
|
|
/*! \brief Destructor for subscription persistence */
|
|
|
|
|
static void subscription_persistence_destroy(void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct subscription_persistence *persistence = obj;
|
|
|
|
|
|
|
|
|
|
ast_free(persistence->endpoint);
|
|
|
|
|
ast_free(persistence->tag);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Allocator for subscription persistence */
|
|
|
|
|
static void *subscription_persistence_alloc(const char *name)
|
|
|
|
|
{
|
|
|
|
|
return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Function which creates initial persistence information of a subscription in sorcery */
|
|
|
|
|
static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
|
|
|
|
|
{
|
|
|
|
|
char tag[PJ_GUID_STRING_LENGTH + 1];
|
|
|
|
|
|
|
|
|
|
/* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
|
|
|
|
|
* look it up by id at all.
|
|
|
|
|
*/
|
|
|
|
|
struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
|
|
|
|
|
"subscription_persistence", NULL);
|
|
|
|
|
|
|
|
|
|
if (!persistence) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
|
|
|
|
|
ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
|
|
|
|
|
persistence->tag = ast_strdup(tag);
|
|
|
|
|
|
|
|
|
|
ast_sorcery_create(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return persistence;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Function which updates persistence information of a subscription in sorcery */
|
|
|
|
|
static void subscription_persistence_update(struct ast_sip_subscription *sub,
|
|
|
|
|
pjsip_rx_data *rdata)
|
|
|
|
|
{
|
|
|
|
|
if (!sub->persistence) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sub->persistence->cseq = sub->dlg->local.cseq;
|
|
|
|
|
|
|
|
|
|
if (rdata) {
|
|
|
|
|
int expires;
|
|
|
|
|
pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
|
|
|
|
|
|
|
|
|
|
expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
|
|
|
|
|
sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
|
|
|
|
|
|
|
|
|
|
ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
|
|
|
|
|
ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
|
|
|
|
|
sub->persistence->src_port = rdata->pkt_info.src_port;
|
|
|
|
|
ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
|
|
|
|
|
sizeof(sub->persistence->transport_key));
|
|
|
|
|
ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
|
|
|
|
|
sizeof(sub->persistence->local_name));
|
|
|
|
|
sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Function which removes persistence of a subscription from sorcery */
|
|
|
|
|
static void subscription_persistence_remove(struct ast_sip_subscription *sub)
|
|
|
|
|
{
|
|
|
|
|
if (!sub->persistence) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
|
|
|
|
|
ao2_ref(sub->persistence, -1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
|
|
|
|
|
static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
|
|
|
|
|
size_t num_accept);
|
|
|
|
|
|
|
|
|
|
/*! \brief Retrieve a handler using the Event header of an rdata message */
|
|
|
|
|
static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
|
|
|
|
|
{
|
|
|
|
|
pjsip_event_hdr *event_header;
|
|
|
|
|
char event[32];
|
|
|
|
|
struct ast_sip_subscription_handler *handler;
|
|
|
|
|
|
|
|
|
|
event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
|
|
|
|
|
if (!event_header) {
|
|
|
|
|
ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
|
|
|
|
|
|
|
|
|
|
handler = find_sub_handler_for_event_name(event);
|
|
|
|
|
if (!handler) {
|
|
|
|
|
ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return handler;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Retrieve a body generator using the Accept header of an rdata message */
|
|
|
|
|
static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
|
|
|
|
|
const struct ast_sip_subscription_handler *handler)
|
|
|
|
|
{
|
|
|
|
|
pjsip_accept_hdr *accept_header;
|
|
|
|
|
char accept[AST_SIP_MAX_ACCEPT][64];
|
|
|
|
|
size_t num_accept_headers;
|
|
|
|
|
|
|
|
|
|
accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
|
|
|
|
|
if (accept_header) {
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < accept_header->count; ++i) {
|
|
|
|
|
ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
|
|
|
|
|
}
|
|
|
|
|
num_accept_headers = accept_header->count;
|
|
|
|
|
} else {
|
|
|
|
|
/* If a SUBSCRIBE contains no Accept headers, then we must assume that
|
|
|
|
|
* the default accept type for the event package is to be used.
|
|
|
|
|
*/
|
|
|
|
|
ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
|
|
|
|
|
num_accept_headers = 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return find_body_generator(accept, num_accept_headers);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Callback function to perform the actual recreation of a subscription */
|
|
|
|
|
static int subscription_persistence_recreate(void *obj, void *arg, int flags)
|
|
|
|
|
{
|
|
|
|
|
struct subscription_persistence *persistence = obj;
|
|
|
|
|
pj_pool_t *pool = arg;
|
|
|
|
|
pjsip_rx_data rdata = { { 0, }, };
|
|
|
|
|
pjsip_expires_hdr *expires_header;
|
|
|
|
|
struct ast_sip_subscription_handler *handler;
|
|
|
|
|
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
|
|
|
|
|
struct ast_sip_subscription *sub;
|
|
|
|
|
struct ast_sip_pubsub_body_generator *generator;
|
|
|
|
|
|
|
|
|
|
/* If this subscription has already expired remove it */
|
|
|
|
|
if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
|
|
|
|
|
if (!endpoint) {
|
|
|
|
|
ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
|
|
|
|
|
persistence->endpoint);
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pj_pool_reset(pool);
|
|
|
|
|
rdata.tp_info.pool = pool;
|
|
|
|
|
|
|
|
|
|
if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
|
|
|
|
|
persistence->transport_key, persistence->local_name, persistence->local_port)) {
|
|
|
|
|
ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
|
|
|
|
|
persistence->endpoint);
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Update the expiration header with the new expiration */
|
|
|
|
|
expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
|
|
|
|
|
if (!expires_header) {
|
|
|
|
|
expires_header = pjsip_expires_hdr_create(pool, 0);
|
|
|
|
|
if (!expires_header) {
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
|
|
|
|
|
}
|
|
|
|
|
expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
|
|
|
|
|
|
|
|
|
|
handler = subscription_get_handler_from_rdata(&rdata);
|
|
|
|
|
if (!handler) {
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
generator = subscription_get_generator_from_rdata(&rdata, handler);
|
|
|
|
|
if (!generator) {
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
|
|
|
|
|
pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
|
|
|
|
|
ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
|
|
|
|
|
pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
|
|
|
|
|
|
|
|
|
|
sub = handler->new_subscribe(endpoint, &rdata);
|
|
|
|
|
if (sub) {
|
|
|
|
|
sub->persistence = ao2_bump(persistence);
|
|
|
|
|
subscription_persistence_update(sub, &rdata);
|
|
|
|
|
} else {
|
|
|
|
|
ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
|
|
|
|
|
static int subscription_persistence_load(void *data)
|
|
|
|
|
{
|
|
|
|
|
struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
|
|
|
|
|
"subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
|
|
|
|
|
pj_pool_t *pool;
|
|
|
|
|
|
|
|
|
|
pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
|
|
|
|
|
PJSIP_POOL_RDATA_INC);
|
|
|
|
|
if (!pool) {
|
|
|
|
|
ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
|
|
|
|
|
|
|
|
|
|
pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
|
|
|
|
|
|
|
|
|
|
ao2_ref(persisted_subscriptions, -1);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
|
|
|
|
|
static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
|
|
|
|
|
{
|
|
|
|
|
struct ast_json_payload *payload;
|
|
|
|
|
const char *type;
|
|
|
|
|
|
|
|
|
|
if (stasis_message_type(message) != ast_manager_get_generic_type()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
payload = stasis_message_data(message);
|
|
|
|
|
type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
|
|
|
|
|
|
|
|
|
|
/* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
|
|
|
|
|
* recreate SIP subscriptions.
|
|
|
|
|
*/
|
|
|
|
|
if (strcmp(type, "FullyBooted")) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* This has to be here so the subscription is recreated when the body generator is available */
|
|
|
|
|
ast_sip_push_task(NULL, subscription_persistence_load, NULL);
|
|
|
|
|
|
|
|
|
|
/* Once the system is fully booted we don't care anymore */
|
|
|
|
|
stasis_unsubscribe(sub);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void add_subscription(struct ast_sip_subscription *obj)
|
|
|
|
|
{
|
|
|
|
|
SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
|
|
|
|
@@ -335,6 +664,9 @@ static void subscription_destructor(void *obj)
|
|
|
|
|
struct ast_sip_subscription *sub = obj;
|
|
|
|
|
|
|
|
|
|
ast_debug(3, "Destroying SIP subscription\n");
|
|
|
|
|
|
|
|
|
|
subscription_persistence_remove(sub);
|
|
|
|
|
|
|
|
|
|
remove_subscription(sub);
|
|
|
|
|
|
|
|
|
|
ao2_cleanup(sub->datastores);
|
|
|
|
@@ -388,6 +720,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
|
|
|
|
|
{
|
|
|
|
|
struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
|
|
|
|
|
pjsip_dialog *dlg;
|
|
|
|
|
struct subscription_persistence *persistence;
|
|
|
|
|
|
|
|
|
|
if (!sub) {
|
|
|
|
|
return NULL;
|
|
|
|
@@ -424,6 +757,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
|
|
|
|
|
ao2_ref(sub, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
|
|
|
|
|
pubsub_module.id, MOD_DATA_PERSISTENCE);
|
|
|
|
|
if (persistence) {
|
|
|
|
|
/* Update the created dialog with the persisted information */
|
|
|
|
|
pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
|
|
|
|
|
pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
|
|
|
|
|
dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
|
|
|
|
|
pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
|
|
|
|
|
dlg->local.cseq = persistence->cseq;
|
|
|
|
|
dlg->remote.cseq = persistence->cseq;
|
|
|
|
|
}
|
|
|
|
|
sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
|
|
|
|
|
/* We keep a reference to the dialog until our subscription is destroyed. See
|
|
|
|
|
* the subscription_destructor for more details
|
|
|
|
@@ -463,6 +807,16 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
|
|
|
|
|
return sub->dlg;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
|
|
|
|
|
{
|
|
|
|
|
/* If this is a persistence recreation the subscription has already been accepted */
|
|
|
|
|
if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
|
|
|
|
|
{
|
|
|
|
|
struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
|
|
|
|
@@ -472,6 +826,8 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx
|
|
|
|
|
res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
|
|
|
|
|
tdata) == PJ_SUCCESS ? 0 : -1;
|
|
|
|
|
|
|
|
|
|
subscription_persistence_update(sub, NULL);
|
|
|
|
|
|
|
|
|
|
ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
|
|
|
|
|
"StateText: %s\r\n"
|
|
|
|
|
"Endpoint: %s\r\n",
|
|
|
|
@@ -688,6 +1044,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h
|
|
|
|
|
pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
|
|
|
|
|
|
|
|
|
|
sub_add_handler(handler);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -755,15 +1112,10 @@ static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST
|
|
|
|
|
|
|
|
|
|
static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
|
|
|
|
|
{
|
|
|
|
|
char event[32];
|
|
|
|
|
char accept[AST_SIP_MAX_ACCEPT][64];
|
|
|
|
|
pjsip_accept_hdr *accept_header;
|
|
|
|
|
pjsip_event_hdr *event_header;
|
|
|
|
|
pjsip_expires_hdr *expires_header;
|
|
|
|
|
struct ast_sip_subscription_handler *handler;
|
|
|
|
|
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
|
|
|
|
|
struct ast_sip_subscription *sub;
|
|
|
|
|
size_t num_accept_headers;
|
|
|
|
|
struct ast_sip_pubsub_body_generator *generator;
|
|
|
|
|
|
|
|
|
|
endpoint = ast_pjsip_rdata_get_endpoint(rdata);
|
|
|
|
@@ -784,38 +1136,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
|
|
|
|
|
return PJ_TRUE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
|
|
|
|
|
if (!event_header) {
|
|
|
|
|
ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
|
|
|
|
|
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
|
|
|
|
|
return PJ_TRUE;
|
|
|
|
|
}
|
|
|
|
|
ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
|
|
|
|
|
|
|
|
|
|
handler = find_sub_handler_for_event_name(event);
|
|
|
|
|
handler = subscription_get_handler_from_rdata(rdata);
|
|
|
|
|
if (!handler) {
|
|
|
|
|
ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
|
|
|
|
|
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
|
|
|
|
|
return PJ_TRUE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
|
|
|
|
|
if (accept_header) {
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < accept_header->count; ++i) {
|
|
|
|
|
ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
|
|
|
|
|
}
|
|
|
|
|
num_accept_headers = accept_header->count;
|
|
|
|
|
} else {
|
|
|
|
|
/* If a SUBSCRIBE contains no Accept headers, then we must assume that
|
|
|
|
|
* the default accept type for the event package is to be used.
|
|
|
|
|
*/
|
|
|
|
|
ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
|
|
|
|
|
num_accept_headers = 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
generator = find_body_generator(accept, num_accept_headers);
|
|
|
|
|
generator = subscription_get_generator_from_rdata(rdata, handler);
|
|
|
|
|
if (!generator) {
|
|
|
|
|
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
|
|
|
|
|
return PJ_TRUE;
|
|
|
|
@@ -839,7 +1166,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
|
|
|
|
|
} else {
|
|
|
|
|
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
sub->persistence = subscription_persistence_create(sub);
|
|
|
|
|
subscription_persistence_update(sub, rdata);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return PJ_TRUE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1471,9 +1802,54 @@ static int ami_show_subscriptions_outbound(struct mansession *s, const struct me
|
|
|
|
|
#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
|
|
|
|
|
#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
|
|
|
|
|
|
|
|
|
|
static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct subscription_persistence *persistence = obj;
|
|
|
|
|
|
|
|
|
|
persistence->endpoint = ast_strdup(var->value);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
|
|
|
|
|
{
|
|
|
|
|
const struct subscription_persistence *persistence = obj;
|
|
|
|
|
|
|
|
|
|
*buf = ast_strdup(persistence->endpoint);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct subscription_persistence *persistence = obj;
|
|
|
|
|
|
|
|
|
|
persistence->tag = ast_strdup(var->value);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
|
|
|
|
|
{
|
|
|
|
|
const struct subscription_persistence *persistence = obj;
|
|
|
|
|
|
|
|
|
|
*buf = ast_strdup(persistence->tag);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct subscription_persistence *persistence = obj;
|
|
|
|
|
return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
|
|
|
|
|
{
|
|
|
|
|
const struct subscription_persistence *persistence = obj;
|
|
|
|
|
return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int load_module(void)
|
|
|
|
|
{
|
|
|
|
|
static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
|
|
|
|
|
struct ast_sorcery *sorcery = ast_sip_get_sorcery();
|
|
|
|
|
|
|
|
|
|
pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
|
|
|
|
|
|
|
|
|
@@ -1496,6 +1872,42 @@ static int load_module(void)
|
|
|
|
|
return AST_MODULE_LOAD_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
|
|
|
|
|
ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
|
|
|
|
|
if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
|
|
|
|
|
NULL, NULL)) {
|
|
|
|
|
ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
|
|
|
|
|
ast_sip_unregister_service(&pubsub_module);
|
|
|
|
|
ast_sched_context_destroy(sched);
|
|
|
|
|
return AST_MODULE_LOAD_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
|
|
|
|
|
CHARFLDSET(struct subscription_persistence, packet));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
|
|
|
|
|
CHARFLDSET(struct subscription_persistence, src_name));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
|
|
|
|
|
FLDSET(struct subscription_persistence, src_port));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
|
|
|
|
|
CHARFLDSET(struct subscription_persistence, transport_key));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
|
|
|
|
|
CHARFLDSET(struct subscription_persistence, local_name));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
|
|
|
|
|
FLDSET(struct subscription_persistence, local_port));
|
|
|
|
|
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
|
|
|
|
|
FLDSET(struct subscription_persistence, cseq));
|
|
|
|
|
ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
|
|
|
|
|
persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
|
|
|
|
|
ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
|
|
|
|
|
persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
|
|
|
|
|
ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
|
|
|
|
|
persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
|
|
|
|
|
|
|
|
|
|
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
|
|
|
|
|
ast_sip_push_task(NULL, subscription_persistence_load, NULL);
|
|
|
|
|
} else {
|
|
|
|
|
stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
|
|
|
|
|
ami_show_subscriptions_inbound);
|
|
|
|
|
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
|
|
|
|
|