Move device state distribution to Stasis-core

In the move from Asterisk's event system to Stasis, this makes
distributed device state aggregation always-on, removes unnecessary
task processors where possible, and collapses aggregate and
non-aggregate states into a single cache for ease of retrieval. This
also removes an intermediary step in device state aggregation.

Review: https://reviewboard.asterisk.org/r/2389/
(closes issue ASTERISK-21101)
Patch-by: Kinsey Moore <kmoore@digium.com>


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@385860 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
Kinsey Moore
2013-04-16 15:33:59 +00:00
parent c1ae5dc49b
commit 191cf99ae1
10 changed files with 712 additions and 448 deletions

View File

@@ -129,7 +129,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/devicestate.h"
#include "asterisk/pbx.h"
#include "asterisk/app.h"
#include "asterisk/astobj2.h"
#include "asterisk/stasis.h"
#include "asterisk/event.h"
#include "asterisk/devicestate.h"
#define DEVSTATE_TOPIC_BUCKETS 57
/*! \brief Device state strings for printing */
static const char * const devstatestring[][2] = {
@@ -188,25 +193,12 @@ static pthread_t change_thread = AST_PTHREADT_NULL;
/*! \brief Flag for the queue */
static ast_cond_t change_pending;
struct devstate_change {
AST_LIST_ENTRY(devstate_change) entry;
uint32_t state;
struct ast_eid eid;
enum ast_devstate_cache cachable;
char device[1];
};
struct stasis_subscription *devstate_message_sub;
static struct {
pthread_t thread;
struct ast_event_sub *event_sub;
ast_cond_t cond;
ast_mutex_t lock;
AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q;
unsigned int enabled:1;
} devstate_collector = {
.thread = AST_PTHREADT_NULL,
.enabled = 0,
};
static struct stasis_topic *device_state_topic_all;
static struct stasis_caching_topic *device_state_topic_cached;
static struct stasis_message_type *device_state_message_type;
static struct stasis_topic_pool *device_state_topic_pool;
/* Forward declarations */
static int getproviderstate(const char *provider, const char *address);
@@ -289,21 +281,16 @@ enum ast_device_state ast_parse_device_state(const char *device)
static enum ast_device_state devstate_cached(const char *device)
{
enum ast_device_state res = AST_DEVICE_UNKNOWN;
struct ast_event *event;
RAII_VAR(struct stasis_message *, cached_msg, NULL, ao2_cleanup);
struct ast_device_state_message *device_state;
event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_END);
cached_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
if (!cached_msg) {
return AST_DEVICE_UNKNOWN;
}
device_state = stasis_message_data(cached_msg);
if (!event)
return res;
res = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
ast_event_destroy(event);
return res;
return device_state->state;
}
/*! \brief Check device state through channel specific function or generic function */
@@ -426,39 +413,9 @@ static int getproviderstate(const char *provider, const char *address)
return res;
}
static void devstate_event(const char *device, enum ast_device_state state, int cachable)
{
struct ast_event *event;
enum ast_event_type event_type;
if (devstate_collector.enabled) {
/* Distributed device state is enabled, so this state change is a change
* for a single server, not the real state. */
event_type = AST_EVENT_DEVICE_STATE_CHANGE;
} else {
event_type = AST_EVENT_DEVICE_STATE;
}
ast_debug(3, "device '%s' state '%d'\n", device, state);
if (!(event = ast_event_new(event_type,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
AST_EVENT_IE_CACHABLE, AST_EVENT_IE_PLTYPE_UINT, cachable,
AST_EVENT_IE_END))) {
return;
}
if (cachable) {
ast_event_queue_and_cache(event);
} else {
ast_event_queue(event);
}
}
/*! Called by the state change thread to find out what the state is, and then
* to queue up the state change event */
static void do_state_change(const char *device, int cachable)
static void do_state_change(const char *device, enum ast_devstate_cache cachable)
{
enum ast_device_state state;
@@ -466,7 +423,7 @@ static void do_state_change(const char *device, int cachable)
ast_debug(3, "Changing state for %s - state %d (%s)\n", device, state, ast_devstate2str(state));
devstate_event(device, state, cachable);
ast_publish_device_state(device, state, cachable);
}
int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_cache cachable, const char *device)
@@ -490,7 +447,7 @@ int ast_devstate_changed_literal(enum ast_device_state state, enum ast_devstate_
*/
if (state != AST_DEVICE_UNKNOWN) {
devstate_event(device, state, cachable);
ast_publish_device_state(device, state, cachable);
} else if (change_thread == AST_PTHREADT_NULL || !(change = ast_calloc(1, sizeof(*change) + strlen(device)))) {
/* we could not allocate a change struct, or */
/* there is no background thread, so process the change now */
@@ -562,176 +519,148 @@ static void *do_devstate_changes(void *data)
return NULL;
}
static void destroy_devstate_change(struct devstate_change *sc)
{
ast_free(sc);
}
#define MAX_SERVERS 64
struct change_collection {
struct devstate_change states[MAX_SERVERS];
size_t num_states;
};
static void devstate_cache_cb(const struct ast_event *event, void *data)
static int devstate_change_aggregator_cb(void *obj, void *arg, void *data, int flags)
{
struct change_collection *collection = data;
int i;
const struct ast_eid *eid;
struct stasis_message *msg = obj;
struct ast_devstate_aggregate *aggregate = arg;
char *device = data;
struct ast_device_state_message *device_state = stasis_message_data(msg);
if (collection->num_states == ARRAY_LEN(collection->states)) {
ast_log(LOG_ERROR, "More per-server state values than we have room for (MAX_SERVERS is %d)\n",
MAX_SERVERS);
return;
if (!device_state->eid || strcmp(device, device_state->device)) {
/* ignore aggregate states and devices that don't match */
return 0;
}
if (!(eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
ast_log(LOG_ERROR, "Device state change event with no EID\n");
return;
}
i = collection->num_states;
collection->states[i].state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
collection->states[i].eid = *eid;
collection->num_states++;
ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
ast_devstate2str(device_state->state), device);
ast_devstate_aggregate_add(aggregate, device_state->state);
return 0;
}
static void process_collection(const char *device, enum ast_devstate_cache cachable, struct change_collection *collection)
static void device_state_dtor(void *obj)
{
int i;
struct ast_devstate_aggregate agg;
enum ast_device_state state;
struct ast_event *event;
struct ast_device_state_message *device_state = obj;
ast_string_field_free_memory(device_state);
ast_free(device_state->eid);
}
ast_devstate_aggregate_init(&agg);
static struct ast_device_state_message *device_state_alloc(const char *device, enum ast_device_state state, enum ast_devstate_cache cachable, const struct ast_eid *eid)
{
RAII_VAR(struct ast_device_state_message *, new_device_state, ao2_alloc(sizeof(*new_device_state), device_state_dtor), ao2_cleanup);
for (i = 0; i < collection->num_states; i++) {
ast_debug(1, "Adding per-server state of '%s' for '%s'\n",
ast_devstate2str(collection->states[i].state), device);
ast_devstate_aggregate_add(&agg, collection->states[i].state);
if (!new_device_state || ast_string_field_init(new_device_state, 256)) {
return NULL;
}
state = ast_devstate_aggregate_result(&agg);
ast_string_field_set(new_device_state, device, device);
new_device_state->state = state;
new_device_state->cachable = cachable;
ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
ast_devstate2str(state), device);
if (eid) {
char eid_str[20];
struct ast_str *cache_id = ast_str_alloca(256);
event = ast_event_get_cached(AST_EVENT_DEVICE_STATE,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_END);
new_device_state->eid = ast_malloc(sizeof(*eid));
if (!new_device_state->eid) {
return NULL;
}
if (event) {
enum ast_device_state old_state;
*new_device_state->eid = *eid;
ast_eid_to_str(eid_str, sizeof(eid_str), new_device_state->eid);
ast_str_set(&cache_id, 0, "%s%s", eid_str, device);
ast_string_field_set(new_device_state, cache_id, ast_str_buffer(cache_id));
} else {
/* no EID makes this an aggregate state */
ast_string_field_set(new_device_state, cache_id, device);
}
old_state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
ao2_ref(new_device_state, +1);
return new_device_state;
}
ast_event_destroy(event);
static enum ast_device_state get_aggregate_state(char *device)
{
RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
struct ast_devstate_aggregate aggregate;
if (state == old_state) {
ast_devstate_aggregate_init(&aggregate);
cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
ao2_callback_data(cached, OBJ_NODATA, devstate_change_aggregator_cb, &aggregate, device);
return ast_devstate_aggregate_result(&aggregate);
}
static int aggregate_state_changed(char *device, enum ast_device_state new_aggregate_state)
{
RAII_VAR(struct stasis_message *, cached_aggregate_msg, NULL, ao2_cleanup);
struct ast_device_state_message *cached_aggregate_device_state;
cached_aggregate_msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), device);
if (!cached_aggregate_msg) {
return 1;
}
cached_aggregate_device_state = stasis_message_data(cached_aggregate_msg);
if (cached_aggregate_device_state->state == new_aggregate_state) {
return 0;
}
return 1;
}
static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
{
enum ast_device_state aggregate_state;
char *device;
struct ast_device_state_message *device_state;
RAII_VAR(struct stasis_message *, new_aggregate_msg, NULL, ao2_cleanup);
RAII_VAR(struct ast_device_state_message *, new_aggregate_state, NULL, ao2_cleanup);
if (stasis_cache_update_type() == stasis_message_type(msg)) {
struct stasis_cache_update *update = stasis_message_data(msg);
if (!update->new_snapshot) {
return;
}
msg = update->new_snapshot;
}
if (ast_device_state_message_type() != stasis_message_type(msg)) {
return;
}
device_state = stasis_message_data(msg);
if (!device_state->eid) {
/* ignore aggregate messages */
return;
}
device = ast_strdupa(device_state->device);
ast_debug(1, "Processing device state change for '%s'\n", device);
if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
/* if it's not cachable, there will be no aggregate state to get
* and this should be passed through */
aggregate_state = device_state->state;
} else {
aggregate_state = get_aggregate_state(device);
ast_debug(1, "Aggregate devstate result is '%s' for '%s'\n",
ast_devstate2str(aggregate_state), device);
if (!aggregate_state_changed(device, aggregate_state)) {
/* No change since last reported device state */
ast_debug(1, "Aggregate state for device '%s' has not changed from '%s'\n",
device, ast_devstate2str(state));
device, ast_devstate2str(aggregate_state));
return;
}
}
ast_debug(1, "Aggregate state for device '%s' has changed to '%s'\n",
device, ast_devstate2str(state));
device, ast_devstate2str(aggregate_state));
event = ast_event_new(AST_EVENT_DEVICE_STATE,
AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device,
AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state,
AST_EVENT_IE_END);
if (!event) {
return;
}
if (cachable) {
ast_event_queue_and_cache(event);
} else {
ast_event_queue(event);
}
}
static void handle_devstate_change(struct devstate_change *sc)
{
struct ast_event_sub *tmp_sub;
struct change_collection collection = {
.num_states = 0,
};
ast_debug(1, "Processing device state change for '%s'\n", sc->device);
if (!(tmp_sub = ast_event_subscribe_new(AST_EVENT_DEVICE_STATE_CHANGE, devstate_cache_cb, &collection))) {
ast_log(LOG_ERROR, "Failed to create subscription\n");
return;
}
if (ast_event_sub_append_ie_str(tmp_sub, AST_EVENT_IE_DEVICE, sc->device)) {
ast_log(LOG_ERROR, "Failed to append device IE\n");
ast_event_sub_destroy(tmp_sub);
return;
}
/* Populate the collection of device states from the cache */
ast_event_dump_cache(tmp_sub);
process_collection(sc->device, sc->cachable, &collection);
ast_event_sub_destroy(tmp_sub);
}
static void *run_devstate_collector(void *data)
{
for (;;) {
struct devstate_change *sc;
ast_mutex_lock(&devstate_collector.lock);
while (!(sc = AST_LIST_REMOVE_HEAD(&devstate_collector.devstate_change_q, entry)))
ast_cond_wait(&devstate_collector.cond, &devstate_collector.lock);
ast_mutex_unlock(&devstate_collector.lock);
handle_devstate_change(sc);
destroy_devstate_change(sc);
}
return NULL;
}
static void devstate_change_collector_cb(const struct ast_event *event, void *data)
{
struct devstate_change *sc;
const char *device;
const struct ast_eid *eid;
uint32_t state;
enum ast_devstate_cache cachable = AST_DEVSTATE_CACHABLE;
device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
if (ast_strlen_zero(device) || !eid) {
ast_log(LOG_ERROR, "Invalid device state change event received\n");
return;
}
if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device))))
return;
strcpy(sc->device, device);
sc->eid = *eid;
sc->state = state;
sc->cachable = cachable;
ast_mutex_lock(&devstate_collector.lock);
AST_LIST_INSERT_TAIL(&devstate_collector.devstate_change_q, sc, entry);
ast_cond_signal(&devstate_collector.cond);
ast_mutex_unlock(&devstate_collector.lock);
ast_publish_device_state_full(device, aggregate_state, device_state->cachable, NULL);
}
/*! \brief Initialize the device state engine in separate thread */
@@ -784,28 +713,106 @@ enum ast_device_state ast_devstate_aggregate_result(struct ast_devstate_aggregat
return agg->state;
}
int ast_enable_distributed_devstate(void)
struct stasis_topic *ast_device_state_topic_all(void)
{
if (devstate_collector.enabled) {
return 0;
return device_state_topic_all;
}
struct stasis_caching_topic *ast_device_state_topic_cached(void)
{
return device_state_topic_cached;
}
struct stasis_message_type *ast_device_state_message_type(void)
{
return device_state_message_type;
}
struct stasis_topic *ast_device_state_topic(const char *device)
{
return stasis_topic_pool_get_topic(device_state_topic_pool, device);
}
int ast_publish_device_state_full(
const char *device,
enum ast_device_state state,
enum ast_devstate_cache cachable,
struct ast_eid *eid)
{
RAII_VAR(struct ast_device_state_message *, device_state, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
struct stasis_topic *device_specific_topic;
ast_assert(!ast_strlen_zero(device));
device_state = device_state_alloc(device, state, cachable, eid);
if (!device_state) {
return -1;
}
devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
devstate_change_collector_cb, "devicestate_engine_enable_distributed", NULL, AST_EVENT_IE_END);
message = stasis_message_create(ast_device_state_message_type(), device_state);
if (!devstate_collector.event_sub) {
device_specific_topic = ast_device_state_topic(device);
if (!device_specific_topic) {
return -1;
}
stasis_publish(device_specific_topic, message);
return 0;
}
static const char *device_state_get_id(struct stasis_message *message)
{
struct ast_device_state_message *device_state;
if (ast_device_state_message_type() != stasis_message_type(message)) {
return NULL;
}
device_state = stasis_message_data(message);
if (device_state->cachable == AST_DEVSTATE_NOT_CACHABLE) {
return NULL;
}
return device_state->cache_id;
}
static void devstate_exit(void)
{
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
ao2_cleanup(device_state_message_type);
device_state_message_type = NULL;
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
}
int devstate_init(void)
{
device_state_topic_all = stasis_topic_create("ast_device_state_topic");
if (!device_state_topic_all) {
return -1;
}
device_state_topic_cached = stasis_caching_topic_create(device_state_topic_all, device_state_get_id);
if (!device_state_topic_cached) {
return -1;
}
device_state_message_type = stasis_message_type_create("ast_device_state_message");
if (!device_state_message_type) {
return -1;
}
device_state_topic_pool = stasis_topic_pool_create(ast_device_state_topic_all());
if (!device_state_topic_pool) {
return -1;
}
devstate_message_sub = stasis_subscribe(stasis_caching_get_topic(ast_device_state_topic_cached()), devstate_change_collector_cb, NULL);
if (!devstate_message_sub) {
ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n");
return -1;
}
ast_mutex_init(&devstate_collector.lock);
ast_cond_init(&devstate_collector.cond, NULL);
if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) {
ast_log(LOG_ERROR, "Unable to start device state collector thread.\n");
return -1;
}
devstate_collector.enabled = 1;
ast_register_atexit(devstate_exit);
return 0;
}