mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-02 19:16:15 +00:00
stasis_state: Create internal stasis_state_proxy object.
This improves the way which stasis_state reference counting works. Since manager->states holds onto the proxy object instead of the real object this allows stasis_state objects to be freed when appropriate without use of a special state_remove function. Additionally each distinct eid associated with the state holds a reference to the state to prevent early release and potentially allow easier debug of leaks. Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9
This commit is contained in:
committed by
George Joseph
parent
57fa604571
commit
25fbe79793
@@ -24,6 +24,18 @@
|
||||
|
||||
#include "asterisk/stasis_state.h"
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Used to link a stasis_state to it's manager
|
||||
*/
|
||||
struct stasis_state_proxy {
|
||||
AO2_WEAKPROXY();
|
||||
/*! The manager that owns and handles this state */
|
||||
struct stasis_state_manager *manager;
|
||||
/*! A unique id for this state object. */
|
||||
char id[0];
|
||||
};
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Associates a stasis topic to its last known published message
|
||||
@@ -38,7 +50,10 @@
|
||||
struct stasis_state {
|
||||
/*! The number of state subscribers */
|
||||
unsigned int num_subscribers;
|
||||
/*! The manager that owns and handles this state */
|
||||
/*!
|
||||
* \brief The manager that owns and handles this state
|
||||
* \note This reference is owned by stasis_state_proxy
|
||||
*/
|
||||
struct stasis_state_manager *manager;
|
||||
/*! Forwarding information, i.e. this topic to manager's topic */
|
||||
struct stasis_forward *forward;
|
||||
@@ -52,11 +67,11 @@ struct stasis_state {
|
||||
*/
|
||||
AST_VECTOR(, struct ast_eid) eids;
|
||||
/*! A unique id for this state object. */
|
||||
char id[0];
|
||||
char *id;
|
||||
};
|
||||
|
||||
AO2_STRING_FIELD_HASH_FN(stasis_state, id);
|
||||
AO2_STRING_FIELD_CMP_FN(stasis_state, id);
|
||||
AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id);
|
||||
AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id);
|
||||
|
||||
/*! The number of buckets to use for managed states */
|
||||
#define STATE_BUCKETS 57
|
||||
@@ -112,17 +127,28 @@ static void state_dtor(void *obj)
|
||||
state->topic = NULL;
|
||||
ao2_cleanup(state->msg);
|
||||
state->msg = NULL;
|
||||
ao2_cleanup(state->manager);
|
||||
state->manager = NULL;
|
||||
|
||||
/* All eids should have been removed */
|
||||
ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
|
||||
AST_VECTOR_FREE(&state->eids);
|
||||
}
|
||||
|
||||
static void state_proxy_dtor(void *obj) {
|
||||
struct stasis_state_proxy *proxy = obj;
|
||||
|
||||
ao2_cleanup(proxy->manager);
|
||||
}
|
||||
|
||||
static void state_proxy_sub_cb(void *obj, void *data)
|
||||
{
|
||||
struct stasis_state_proxy *proxy = obj;
|
||||
|
||||
ao2_unlink(proxy->manager->states, proxy);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Allocate a stasis state object.
|
||||
* \brief Allocate a stasis state object and add it to the manager.
|
||||
*
|
||||
* Create and initialize a state structure. It's required that either a state
|
||||
* topic, or an id is specified. If a state topic is not given then one will be
|
||||
@@ -134,45 +160,16 @@ static void state_dtor(void *obj)
|
||||
*
|
||||
* \return A stasis_state object or NULL
|
||||
* \return NULL on error
|
||||
*
|
||||
* \pre manager->states must be locked.
|
||||
* \pre manager->states does not contain an object matching key \a id.
|
||||
*/
|
||||
static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
|
||||
struct stasis_topic *state_topic, const char *id)
|
||||
struct stasis_topic *state_topic, const char *id,
|
||||
const char *file, int line, const char *func)
|
||||
{
|
||||
struct stasis_state *state;
|
||||
|
||||
if (!state_topic) {
|
||||
char *name;
|
||||
|
||||
/* If not given a state topic, then an id is required */
|
||||
ast_assert(id != NULL);
|
||||
|
||||
/*
|
||||
* To provide further detail and to ensure that the topic is unique within the
|
||||
* scope of the system we prefix it with the manager's topic name, which should
|
||||
* itself already be unique.
|
||||
*/
|
||||
if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
|
||||
ast_log(LOG_ERROR, "Unable to create state topic name '%s/%s'\n",
|
||||
stasis_topic_name(manager->all_topic), id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
state_topic = stasis_topic_create(name);
|
||||
|
||||
if (!state_topic) {
|
||||
ast_log(LOG_ERROR, "Unable to create state topic '%s'\n", name);
|
||||
ast_free(name);
|
||||
return NULL;
|
||||
}
|
||||
ast_free(name);
|
||||
} else {
|
||||
/*
|
||||
* Since the state topic was passed in, go ahead and bump its reference.
|
||||
* By doing this here first, it allows us to consistently decrease the reference on
|
||||
* state allocation error.
|
||||
*/
|
||||
ao2_ref(state_topic, +1);
|
||||
}
|
||||
struct stasis_state_proxy *proxy = NULL;
|
||||
struct stasis_state *state = NULL;
|
||||
|
||||
if (!id) {
|
||||
/* If not given an id, then a state topic is required */
|
||||
@@ -182,77 +179,87 @@ static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
|
||||
id = state_id_by_topic(manager->all_topic, state_topic);
|
||||
}
|
||||
|
||||
state = ao2_alloc(sizeof(*state) + strlen(id) + 1, state_dtor);
|
||||
state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
|
||||
if (!state) {
|
||||
ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
|
||||
id, stasis_topic_name(manager->all_topic));
|
||||
ao2_ref(state_topic, -1);
|
||||
return NULL;
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
strcpy(state->id, id); /* Safe */
|
||||
state->topic = state_topic; /* ref already bumped above */
|
||||
if (!state_topic) {
|
||||
char *name;
|
||||
|
||||
/*
|
||||
* To provide further detail and to ensure that the topic is unique within the
|
||||
* scope of the system we prefix it with the manager's topic name, which should
|
||||
* itself already be unique.
|
||||
*/
|
||||
if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
state->topic = stasis_topic_create(name);
|
||||
|
||||
ast_free(name);
|
||||
if (!state->topic) {
|
||||
goto error_return;
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* Since the state topic was passed in, go ahead and bump its reference.
|
||||
* By doing this here first, it allows us to consistently decrease the reference on
|
||||
* state allocation error.
|
||||
*/
|
||||
ao2_ref(state_topic, +1);
|
||||
state->topic = state_topic;
|
||||
}
|
||||
|
||||
proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
|
||||
if (!proxy) {
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
strcpy(proxy->id, id); /* Safe */
|
||||
|
||||
state->id = proxy->id;
|
||||
proxy->manager = ao2_bump(manager);
|
||||
state->manager = proxy->manager; /* state->manager is owned by the proxy */
|
||||
|
||||
state->forward = stasis_forward_all(state->topic, manager->all_topic);
|
||||
if (!state->forward) {
|
||||
ast_log(LOG_ERROR, "Unable to add state '%s' forward in manager '%s'\n",
|
||||
id, stasis_topic_name(manager->all_topic));
|
||||
ao2_ref(state, -1);
|
||||
return NULL;
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
if (AST_VECTOR_INIT(&state->eids, 2)) {
|
||||
ast_log(LOG_ERROR, "Unable to initialize eids for state '%s' in manager '%s'\n",
|
||||
id, stasis_topic_name(manager->all_topic));
|
||||
ao2_ref(state, -1);
|
||||
return NULL;
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
state->manager = ao2_bump(manager);
|
||||
if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) {
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
|
||||
goto error_return;
|
||||
}
|
||||
|
||||
ao2_ref(proxy, -1);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Create a state object, and add it to the manager.
|
||||
*
|
||||
* \note Locking on the states container is specifically not done here, thus
|
||||
* appropriate locks should be applied prior to this function being called.
|
||||
*
|
||||
* \param manager The manager to be added to
|
||||
* \param state_topic A state topic to be managed (if NULL id is required)
|
||||
* \param id The unique id for the state (if NULL state_topic is required)
|
||||
*
|
||||
* \return The added state object
|
||||
* \return NULL on error
|
||||
*/
|
||||
static struct stasis_state *state_add(struct stasis_state_manager *manager,
|
||||
struct stasis_topic *state_topic, const char *id)
|
||||
{
|
||||
struct stasis_state *state = state_alloc(manager, state_topic, id);
|
||||
|
||||
if (!state) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!ao2_link_flags(manager->states, state, OBJ_NOLOCK)) {
|
||||
ast_log(LOG_ERROR, "Unable to add state '%s' to manager '%s'\n",
|
||||
state->id ? state->id : "", stasis_topic_name(manager->all_topic));
|
||||
ao2_ref(state, -1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return state;
|
||||
error_return:
|
||||
ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
|
||||
id, stasis_topic_name(manager->all_topic));
|
||||
ao2_cleanup(state);
|
||||
ao2_cleanup(proxy);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Find a state by id, or create one if not found and add it to the manager.
|
||||
*
|
||||
* \note Locking on the states container is specifically not done here, thus
|
||||
* appropriate locks should be applied prior to this function being called.
|
||||
*
|
||||
* \param manager The manager to be added to
|
||||
* \param state_topic A state topic to be managed (if NULL id is required)
|
||||
* \param id The unique id for the state (if NULL state_topic is required)
|
||||
@@ -260,18 +267,26 @@ static struct stasis_state *state_add(struct stasis_state_manager *manager,
|
||||
* \return The added state object
|
||||
* \return NULL on error
|
||||
*/
|
||||
static struct stasis_state *state_find_or_add(struct stasis_state_manager *manager,
|
||||
struct stasis_topic *state_topic, const char *id)
|
||||
#define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
|
||||
static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager,
|
||||
struct stasis_topic *state_topic, const char *id,
|
||||
const char *file, int line, const char *func)
|
||||
{
|
||||
struct stasis_state *state;
|
||||
|
||||
ao2_lock(manager->states);
|
||||
if (ast_strlen_zero(id)) {
|
||||
id = state_id_by_topic(manager->all_topic, state_topic);
|
||||
}
|
||||
|
||||
state = ao2_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
|
||||
state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
|
||||
if (!state) {
|
||||
state = state_alloc(manager, state_topic, id, file, line, func);
|
||||
}
|
||||
|
||||
return state ? state : state_add(manager, state_topic, id);
|
||||
ao2_unlock(manager->states);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
static void state_manager_dtor(void *obj)
|
||||
@@ -317,7 +332,7 @@ struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
|
||||
}
|
||||
|
||||
manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
|
||||
STATE_BUCKETS, stasis_state_hash_fn, NULL, stasis_state_cmp_fn);
|
||||
STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
|
||||
if (!manager->states) {
|
||||
ao2_ref(manager, -1);
|
||||
return NULL;
|
||||
@@ -356,10 +371,7 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
|
||||
struct stasis_topic *topic;
|
||||
struct stasis_state *state;
|
||||
|
||||
ao2_lock(manager->states);
|
||||
state = state_find_or_add(manager, NULL, id);
|
||||
ao2_unlock(manager->states);
|
||||
|
||||
if (!state) {
|
||||
return NULL;
|
||||
}
|
||||
@@ -369,53 +381,6 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
|
||||
return topic;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Remove a state from the stasis manager
|
||||
*
|
||||
* State should only be removed from the manager under the following conditions:
|
||||
*
|
||||
* There are no more subscribers to it
|
||||
* There are no more explicit publishers publishing to it
|
||||
* There are no more implicit publishers publishing to it
|
||||
*
|
||||
* Subscribers and explicit publishers hold a reference to the state object itself, so
|
||||
* once a state's reference count drops to 2 (1 for the manager, 1 passed in) then we
|
||||
* know there are no more subscribers or explicit publishers. Implicit publishers are
|
||||
* tracked by eids, so once that container is empty no more implicit publishers exist
|
||||
* for the state either. Only then can a state be removed.
|
||||
*
|
||||
* \param state The state to remove
|
||||
*/
|
||||
static void state_remove(struct stasis_state *state)
|
||||
{
|
||||
ao2_lock(state);
|
||||
|
||||
/*
|
||||
* The manager's state container also needs to be locked here prior to checking
|
||||
* the state's reference count, and potentially removing since we don't want its
|
||||
* count to possibly increase between the check and unlinking.
|
||||
*/
|
||||
ao2_lock(state->manager->states);
|
||||
|
||||
/*
|
||||
* If there are only 2 references left then it's the one owned by the manager,
|
||||
* and the one passed in to this function. However, before removing it from the
|
||||
* manager we need to also check that no eid is associated with the given state.
|
||||
* If an eid still remains then this means that an implicit publisher is still
|
||||
* publishing to this state.
|
||||
*/
|
||||
if (ao2_ref(state, 0) == 2 && AST_VECTOR_SIZE(&state->eids) == 0) {
|
||||
ao2_unlink_flags(state->manager->states, state, 0);
|
||||
}
|
||||
|
||||
ao2_unlock(state->manager->states);
|
||||
ao2_unlock(state);
|
||||
|
||||
/* Now it's safe to remove the reference that is held on the given state */
|
||||
ao2_ref(state, -1);
|
||||
}
|
||||
|
||||
struct stasis_state_subscriber {
|
||||
/*! The stasis state subscribed to */
|
||||
struct stasis_state *state;
|
||||
@@ -441,7 +406,7 @@ static void subscriber_dtor(void *obj)
|
||||
--sub->state->num_subscribers;
|
||||
ao2_unlock(sub->state);
|
||||
|
||||
state_remove(sub->state);
|
||||
ao2_ref(sub->state, -1);
|
||||
}
|
||||
|
||||
struct stasis_state_subscriber *stasis_state_add_subscriber(
|
||||
@@ -457,14 +422,11 @@ struct stasis_state_subscriber *stasis_state_add_subscriber(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_lock(manager->states);
|
||||
sub->state = state_find_or_add(manager, NULL, id);
|
||||
if (!sub->state) {
|
||||
ao2_unlock(manager->states);
|
||||
ao2_ref(sub, -1);
|
||||
return NULL;
|
||||
}
|
||||
ao2_unlock(manager->states);
|
||||
|
||||
ao2_lock(sub->state);
|
||||
++sub->state->num_subscribers;
|
||||
@@ -563,7 +525,7 @@ static void publisher_dtor(void *obj)
|
||||
{
|
||||
struct stasis_state_publisher *pub = obj;
|
||||
|
||||
state_remove(pub->state);
|
||||
ao2_ref(pub->state, -1);
|
||||
}
|
||||
|
||||
struct stasis_state_publisher *stasis_state_add_publisher(
|
||||
@@ -578,14 +540,11 @@ struct stasis_state_publisher *stasis_state_add_publisher(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_lock(manager->states);
|
||||
pub->state = state_find_or_add(manager, NULL, id);
|
||||
if (!pub->state) {
|
||||
ao2_unlock(manager->states);
|
||||
ao2_ref(pub, -1);
|
||||
return NULL;
|
||||
}
|
||||
ao2_unlock(manager->states);
|
||||
|
||||
return pub;
|
||||
}
|
||||
@@ -639,7 +598,10 @@ static void state_find_or_add_eid(struct stasis_state *state, const struct ast_e
|
||||
}
|
||||
|
||||
if (i == AST_VECTOR_SIZE(&state->eids)) {
|
||||
AST_VECTOR_APPEND(&state->eids, *eid);
|
||||
if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
|
||||
/* This ensures state cannot be freed if it has any eids */
|
||||
ao2_ref(state, +1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -666,6 +628,8 @@ static void state_find_and_remove_eid(struct stasis_state *state, const struct a
|
||||
for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
|
||||
if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
|
||||
AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
|
||||
/* Balance the reference from state_find_or_add_eid */
|
||||
ao2_ref(state, -1);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -676,10 +640,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
|
||||
{
|
||||
struct stasis_state *state;
|
||||
|
||||
ao2_lock(manager->states);
|
||||
state = state_find_or_add(manager, NULL, id);
|
||||
ao2_unlock(manager->states);
|
||||
|
||||
if (!state) {
|
||||
return;
|
||||
}
|
||||
@@ -697,7 +658,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
|
||||
void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
|
||||
const char *id, const struct ast_eid *eid, struct stasis_message *msg)
|
||||
{
|
||||
struct stasis_state *state = ao2_find(manager->states, id, OBJ_SEARCH_KEY);
|
||||
struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
|
||||
|
||||
if (!state) {
|
||||
/*
|
||||
@@ -721,7 +682,7 @@ void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
|
||||
state_find_and_remove_eid(state, eid);
|
||||
ao2_unlock(state);
|
||||
|
||||
state_remove(state);
|
||||
ao2_ref(state, -1);
|
||||
}
|
||||
|
||||
int stasis_state_add_observer(struct stasis_state_manager *manager,
|
||||
@@ -744,10 +705,8 @@ void stasis_state_remove_observer(struct stasis_state_manager *manager,
|
||||
AST_VECTOR_RW_UNLOCK(&manager->observers);
|
||||
}
|
||||
|
||||
static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
|
||||
static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
|
||||
{
|
||||
struct stasis_state *state = obj;
|
||||
on_stasis_state handler = arg;
|
||||
struct stasis_message *msg;
|
||||
int res;
|
||||
|
||||
@@ -764,24 +723,41 @@ static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
|
||||
return res;
|
||||
}
|
||||
|
||||
static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
|
||||
{
|
||||
struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
|
||||
|
||||
if (state) {
|
||||
int res;
|
||||
res = handle_stasis_state(state, arg, data);
|
||||
ao2_ref(state, -1);
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler,
|
||||
void *data)
|
||||
{
|
||||
ast_assert(handler != NULL);
|
||||
|
||||
ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
|
||||
handle_stasis_state, handler, data);
|
||||
handle_stasis_state_proxy, handler, data);
|
||||
}
|
||||
|
||||
static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
|
||||
{
|
||||
struct stasis_state *state = obj;
|
||||
struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
|
||||
int res = 0;
|
||||
|
||||
if (state->num_subscribers) {
|
||||
return handle_stasis_state(obj, arg, data, flags);
|
||||
if (state && state->num_subscribers) {
|
||||
res = handle_stasis_state(state, arg, data);
|
||||
}
|
||||
|
||||
return 0;
|
||||
ao2_cleanup(state);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,
|
||||
|
Reference in New Issue
Block a user