Move presence state distribution to Stasis-core

Convert presence state events to Stasis-core messages and remove
redundant serializers where possible.

Review: https://reviewboard.asterisk.org/r/2410/
(closes issue ASTERISK-21102)
Patch-by: Kinsey Moore <kmoore@digium.com>


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@385862 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Kinsey Moore
2013-04-16 15:48:16 +00:00
parent 191cf99ae1
commit 71a01725b8
4 changed files with 239 additions and 256 deletions

View File

@@ -53,13 +53,9 @@ static const struct {
{ "dnd", AST_PRESENCE_DND},
};
/*! \brief Flag for the queue */
static ast_cond_t change_pending;
struct state_change {
AST_LIST_ENTRY(state_change) list;
char provider[1];
};
struct stasis_message_type *presence_state_type;
struct stasis_topic *presence_state_topic_all;
struct stasis_caching_topic *presence_state_topic_cached;
/*! \brief A presence state provider */
struct presence_state_provider {
@@ -71,13 +67,6 @@ struct presence_state_provider {
/*! \brief A list of providers */
static AST_RWLIST_HEAD_STATIC(presence_state_providers, presence_state_provider);
/*! \brief The state change queue. State changes are queued
for processing by a separate thread */
static AST_LIST_HEAD_STATIC(state_changes, state_change);
/*! \brief The presence state change notification thread */
static pthread_t change_thread = AST_PTHREADT_NULL;
const char *ast_presence_state2str(enum ast_presence_state state)
{
int i;
@@ -103,25 +92,20 @@ enum ast_presence_state ast_presence_state_val(const char *val)
static enum ast_presence_state presence_state_cached(const char *presence_provider, char **subtype, char **message)
{
enum ast_presence_state res = AST_PRESENCE_INVALID;
struct ast_event *event;
const char *_subtype;
const char *_message;
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_presence_state_message *presence_state;
event = ast_event_get_cached(AST_EVENT_PRESENCE_STATE,
AST_EVENT_IE_PRESENCE_PROVIDER, AST_EVENT_IE_PLTYPE_STR, presence_provider,
AST_EVENT_IE_END);
msg = stasis_cache_get(ast_presence_state_topic_cached(), ast_presence_state_message_type(), presence_provider);
if (!event) {
if (!msg) {
return res;
}
res = ast_event_get_ie_uint(event, AST_EVENT_IE_PRESENCE_STATE);
_subtype = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_SUBTYPE);
_message = ast_event_get_ie_str(event, AST_EVENT_IE_PRESENCE_MESSAGE);
presence_state = stasis_message_data(msg);
res = presence_state->state;
*subtype = !ast_strlen_zero(_subtype) ? ast_strdup(_subtype) : NULL;
*message = !ast_strlen_zero(_message) ? ast_strdup(_message) : NULL;
ast_event_destroy(event);
*subtype = !ast_strlen_zero(presence_state->subtype) ? ast_strdup(presence_state->subtype) : NULL;
*message = !ast_strlen_zero(presence_state->message) ? ast_strdup(presence_state->message) : NULL;
return res;
}
@@ -213,23 +197,50 @@ int ast_presence_state_prov_del(const char *label)
return res;
}
static void presence_state_dtor(void *obj)
{
struct ast_presence_state_message *presence_state = obj;
ast_string_field_free_memory(presence_state);
}
static struct ast_presence_state_message *presence_state_alloc(const char *provider,
enum ast_presence_state state,
const char *subtype,
const char *message)
{
RAII_VAR(struct ast_presence_state_message *, presence_state, ao2_alloc(sizeof(*presence_state), presence_state_dtor), ao2_cleanup);
if (!presence_state || ast_string_field_init(presence_state, 256)) {
return NULL;
}
presence_state->state = state;
ast_string_field_set(presence_state, provider, provider);
ast_string_field_set(presence_state, subtype, S_OR(subtype, ""));
ast_string_field_set(presence_state, message, S_OR(message, ""));
ao2_ref(presence_state, +1);
return presence_state;
}
static void presence_state_event(const char *provider,
enum ast_presence_state state,
const char *subtype,
const char *message)
{
struct ast_event *event;
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
RAII_VAR(struct ast_presence_state_message *, presence_state, presence_state_alloc(provider, state, subtype, message), ao2_cleanup);
if (!(event = ast_event_new(AST_EVENT_PRESENCE_STATE,
AST_EVENT_IE_PRESENCE_PROVIDER, AST_EVENT_IE_PLTYPE_STR, provider,
AST_EVENT_IE_PRESENCE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
AST_EVENT_IE_PRESENCE_SUBTYPE, AST_EVENT_IE_PLTYPE_STR, S_OR(subtype, ""),
AST_EVENT_IE_PRESENCE_MESSAGE, AST_EVENT_IE_PLTYPE_STR, S_OR(message, ""),
AST_EVENT_IE_END))) {
if (!presence_state) {
return;
}
ast_event_queue_and_cache(event);
msg = stasis_message_create(ast_presence_state_message_type(), presence_state);
if (!msg) {
return;
}
stasis_publish(ast_presence_state_topic_all(), msg);
}
static void do_presence_state_change(const char *provider)
@@ -254,19 +265,10 @@ int ast_presence_state_changed_literal(enum ast_presence_state state,
const char *message,
const char *presence_provider)
{
struct state_change *change;
if (state != AST_PRESENCE_NOT_SET) {
presence_state_event(presence_provider, state, subtype, message);
} else if ((change_thread == AST_PTHREADT_NULL) ||
!(change = ast_calloc(1, sizeof(*change) + strlen(presence_provider)))) {
if (state == AST_PRESENCE_NOT_SET) {
do_presence_state_change(presence_provider);
} else {
strcpy(change->provider, presence_provider);
AST_LIST_LOCK(&state_changes);
AST_LIST_INSERT_TAIL(&state_changes, change, list);
ast_cond_signal(&change_pending);
AST_LIST_UNLOCK(&state_changes);
presence_state_event(presence_provider, state, subtype, message);
}
return 0;
@@ -287,39 +289,60 @@ int ast_presence_state_changed(enum ast_presence_state state,
return ast_presence_state_changed_literal(state, subtype, message, buf);
}
/*! \brief Go through the presence state change queue and update changes in the presence state thread */
static void *do_presence_changes(void *data)
struct stasis_message_type *ast_presence_state_message_type(void)
{
struct state_change *next, *current;
return presence_state_type;
}
for (;;) {
/* This basically pops off any state change entries, resets the list back to NULL, unlocks, and processes each state change */
AST_LIST_LOCK(&state_changes);
if (AST_LIST_EMPTY(&state_changes))
ast_cond_wait(&change_pending, &state_changes.lock);
next = AST_LIST_FIRST(&state_changes);
AST_LIST_HEAD_INIT_NOLOCK(&state_changes);
AST_LIST_UNLOCK(&state_changes);
struct stasis_topic *ast_presence_state_topic_all(void)
{
return presence_state_topic_all;
}
/* Process each state change */
while ((current = next)) {
next = AST_LIST_NEXT(current, list);
do_presence_state_change(current->provider);
ast_free(current);
}
struct stasis_caching_topic *ast_presence_state_topic_cached(void)
{
return presence_state_topic_cached;
}
static const char *presence_state_get_id(struct stasis_message *msg)
{
struct ast_presence_state_message *presence_state = stasis_message_data(msg);
if (stasis_message_type(msg) != ast_presence_state_message_type()) {
return NULL;
}
return NULL;
return presence_state->provider;
}
static void presence_state_engine_cleanup(void)
{
ao2_cleanup(presence_state_topic_all);
presence_state_topic_all = NULL;
ao2_cleanup(presence_state_topic_cached);
presence_state_topic_cached = NULL;
ao2_cleanup(presence_state_type);
presence_state_type = NULL;
}
int ast_presence_state_engine_init(void)
{
ast_cond_init(&change_pending, NULL);
if (ast_pthread_create_background(&change_thread, NULL, do_presence_changes, NULL) < 0) {
ast_log(LOG_ERROR, "Unable to start presence state change thread.\n");
presence_state_type = stasis_message_type_create("ast_presence_state_message");
if (!presence_state_type) {
return -1;
}
presence_state_topic_all = stasis_topic_create("ast_presence_state_topic_all");
if (!presence_state_topic_all) {
return -1;
}
presence_state_topic_cached = stasis_caching_topic_create(presence_state_topic_all, presence_state_get_id);
if (!presence_state_topic_cached) {
return -1;
}
ast_register_atexit(presence_state_engine_cleanup);
return 0;
}