app_cdr,app_forkcdr,func_cdr: Synchronize with engine when manipulating state

When doing the rework of the CDR engine that pushed all of the logic into cdr.c
and made it respond to changes in channel state over Stasis, we knew that
accessing the CDR engine from the dialplan would be "slightly"
non-deterministic. Dialplan threads would be accessing CDRs while Stasis
threads would be updating the state of said CDRs - whereas in the past,
everything happened on the dialplan threads. Tests have shown that "slightly"
is in reality "very".

This patch synchronizes things by making the dialplan applications/functions
that manipulate CDRs do so over Stasis. ForkCDR, NoCDR, ResetCDR, CDR, and
CDR_PROP now all use Stasis to send their requests over to the CDR engine,
and synchronize on the channel Stasis topic via a subscription so that they
return their values/control to the dialplan at the appropriate time.

While going through this, the following changes were also made:
 * DISA, which can reset the CDR when a user successfully authenticates, now
   just uses the ResetCDR app to do this. This prevents having to duplicate
   the same Stasis synchronization logic in that application.
 * Answer no longer disables CDRs. It actually didn't work anyway - calling
   DISABLE on the channel's CDR doesn't stop the CDR from getting the Answer
   time - it just kills all CDRs on that channel, which isn't what the caller
   would intend.

(closes issue ASTERISK-22884)
(closes issue ASTERISK-22886)

Review: https://reviewboard.asterisk.org/r/3057/
........

Merged revisions 404294 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@404295 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Matthew Jordan
2013-12-19 00:50:01 +00:00
parent af723c6572
commit 7e9febbf86
9 changed files with 429 additions and 91 deletions

View File

@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/cdr.h"
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
/*** DOCUMENTATION
<application name="ForkCDR" language="en_US">
@@ -102,8 +103,41 @@ AST_APP_OPTIONS(forkcdr_exec_options, {
AST_APP_OPTION('v', AST_CDR_FLAG_KEEP_VARS),
});
STASIS_MESSAGE_TYPE_DEFN_LOCAL(forkcdr_message_type);
/*! \internal \brief Message payload for the Stasis message sent to fork the CDR */
struct fork_cdr_message_payload {
/*! The name of the channel whose CDR will be forked */
const char *channel_name;
/*! Option flags that control how the CDR will be forked */
struct ast_flags *flags;
};
static void forkcdr_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct fork_cdr_message_payload *payload;
if (stasis_message_type(message) != forkcdr_message_type()) {
return;
}
payload = stasis_message_data(message);
if (!payload) {
return;
}
if (ast_cdr_fork(payload->channel_name, payload->flags)) {
ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s\n",
payload->channel_name);
}
}
static int forkcdr_exec(struct ast_channel *chan, const char *data)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
char *parse;
struct ast_flags flags = { 0, };
AST_DECLARE_APP_ARGS(args,
@@ -118,21 +152,48 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
ast_app_parse_options(forkcdr_exec_options, &flags, NULL, args.options);
}
if (ast_cdr_fork(ast_channel_name(chan), &flags)) {
ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s\n", ast_channel_name(chan));
if (!payload) {
return -1;
}
payload->channel_name = ast_channel_name(chan);
payload->flags = &flags;
message = stasis_message_create(forkcdr_message_type(), payload);
if (!message) {
ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create message\n",
ast_channel_name(chan));
return -1;
}
subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL);
if (!subscription) {
ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n",
payload->channel_name);
return -1;
}
stasis_publish(ast_channel_topic(chan), message);
subscription = stasis_unsubscribe_and_join(subscription);
return 0;
}
static int unload_module(void)
{
STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
return ast_unregister_application(app);
}
static int load_module(void)
{
return ast_register_application_xml(app, forkcdr_exec);
int res = 0;
res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type);
res |= ast_register_application_xml(app, forkcdr_exec);
return res;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities");