CDRs: Synchronize dialplan applications that manipulate CDRs with the engine

In https://reviewboard.asterisk.org/r/3057/, applications and functions that
manipulate CDRs were made to interact over Stasis. This was done to
synchronize manipulations of CDRs from the dialplan with the updates the
engine itself receives over the message bus.

This change rested on a faulty premise: that messages published to the CDR
topic or to a topic that forwards to the CDR topic are synchronized with the
messages handled by the CDR topic subscription in the CDR engine. This is not
the case. There is no ordering guaranteed for two messages published to the
same topic; ordering is only guaranteed if a message is published to the same
subscriber.

Stasis was modified in r405311 to allow a publisher to synchronize on the
subscriber. This patch uses that API to synchronize the CDR publishers with
the CDR engine message router, which maintains the overall topic subscription.

(closes issue ASTERISK-22884)
Reported by: Matt Jordan

Review: https://reviewboard.asterisk.org/r/3099/
........

Merged revisions 405312 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405314 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Matthew Jordan
2014-01-12 22:13:12 +00:00
parent f8aaf585a3
commit 373965dbff
5 changed files with 148 additions and 87 deletions

View File

@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/cdr.h"
#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<function name="CDR" language="en_US">
@@ -207,6 +208,7 @@ struct cdr_func_payload {
const char *cmd;
const char *arguments;
const char *value;
void *data;
};
struct cdr_func_data {
@@ -220,8 +222,8 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_prop_write_message_type);
static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct cdr_func_data *output = data;
struct cdr_func_payload *payload = stasis_message_data(message);
struct cdr_func_data *output;
char *info;
char *value = NULL;
struct ast_flags flags = { 0 };
@@ -235,9 +237,9 @@ static void cdr_read_callback(void *data, struct stasis_subscription *sub, struc
return;
}
if (!payload || !output) {
return;
}
ast_assert(payload != NULL);
output = payload->data;
ast_assert(output != NULL);
if (ast_strlen_zero(payload->arguments)) {
ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
@@ -441,6 +443,7 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
payload->chan = chan;
payload->cmd = cmd;
payload->arguments = parse;
payload->data = &output;
buf[0] = '\0';/* Ensure the buffer is initialized. */
output.buf = buf;
@@ -460,18 +463,14 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (ast_strlen_zero(ast_channel_name(chan))) {
cdr_read_callback(NULL, NULL, message);
} else {
RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
if (!subscription) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
if (!router) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
stasis_publish(ast_channel_topic(chan), message);
subscription = stasis_unsubscribe_and_join(subscription);
stasis_message_router_publish_sync(router, message);
}
return 0;
@@ -482,8 +481,15 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
RAII_VAR(struct stasis_message_router *, router,
ast_cdr_message_router(), ao2_cleanup);
if (!router) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
if (!payload) {
return -1;
@@ -499,17 +505,7 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
ast_channel_name(chan));
return -1;
}
subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
if (!subscription) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
ast_channel_name(chan));
return -1;
}
stasis_publish(ast_channel_topic(chan), message);
subscription = stasis_unsubscribe_and_join(subscription);
stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -520,7 +516,13 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
if (!router) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
if (!payload) {
return -1;
@@ -536,17 +538,7 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
ast_channel_name(chan));
return -1;
}
subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
if (!subscription) {
ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
ast_channel_name(chan));
return -1;
}
stasis_publish(ast_channel_topic(chan), message);
subscription = stasis_unsubscribe_and_join(subscription);
stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -565,8 +557,14 @@ static struct ast_custom_function cdr_prop_function = {
static int unload_module(void)
{
RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
if (router) {
stasis_message_router_remove(router, cdr_prop_write_message_type());
stasis_message_router_remove(router, cdr_write_message_type());
stasis_message_router_remove(router, cdr_read_message_type());
}
STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
@@ -578,15 +576,29 @@ static int unload_module(void)
static int load_module(void)
{
RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
if (!router) {
return AST_MODULE_LOAD_FAILURE;
}
res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
res |= ast_custom_function_register(&cdr_function);
res |= ast_custom_function_register(&cdr_prop_function);
res |= stasis_message_router_add(router, cdr_prop_write_message_type(),
cdr_prop_write_callback, NULL);
res |= stasis_message_router_add(router, cdr_write_message_type(),
cdr_write_callback, NULL);
res |= stasis_message_router_add(router, cdr_read_message_type(),
cdr_read_callback, NULL);
return res;
if (res) {
return AST_MODULE_LOAD_FAILURE;
}
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions");