mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-04 20:04:50 +00:00
Multiple revisions 400318-400319
........ r400318 | mmichelson | 2013-10-02 17:08:49 -0500 (Wed, 02 Oct 2013) | 12 lines Remove unnecessary waits from stasis. Since caches are updated on publisher threads, there is no need to wait for the cache updates to occur after a stasis message is published. In the case of chan_pjsip device state changes, this set of changes caused an improvement to performance. Review: https://reviewboard.asterisk.org/r/2890 ........ r400319 | mmichelson | 2013-10-02 17:10:54 -0500 (Wed, 02 Oct 2013) | 3 lines Remove svn:mergeinfo property. ........ Merged revisions 400318-400319 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400335 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -896,7 +896,11 @@ static int chan_pjsip_devicestate(const char *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
endpoint_snapshot = ast_endpoint_latest_snapshot(ast_endpoint_get_tech(endpoint->persistent),
|
endpoint_snapshot = ast_endpoint_latest_snapshot(ast_endpoint_get_tech(endpoint->persistent),
|
||||||
ast_endpoint_get_resource(endpoint->persistent), 1);
|
ast_endpoint_get_resource(endpoint->persistent));
|
||||||
|
|
||||||
|
if (!endpoint_snapshot) {
|
||||||
|
return AST_DEVICE_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
if (endpoint_snapshot->state == AST_ENDPOINT_OFFLINE) {
|
if (endpoint_snapshot->state == AST_ENDPOINT_OFFLINE) {
|
||||||
state = AST_DEVICE_UNAVAILABLE;
|
state = AST_DEVICE_UNAVAILABLE;
|
||||||
@@ -916,7 +920,6 @@ static int chan_pjsip_devicestate(const char *data)
|
|||||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
||||||
struct ast_channel_snapshot *snapshot;
|
struct ast_channel_snapshot *snapshot;
|
||||||
|
|
||||||
stasis_topic_wait(ast_channel_topic_all_cached());
|
|
||||||
msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
|
msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
|
||||||
endpoint_snapshot->channel_ids[num]);
|
endpoint_snapshot->channel_ids[num]);
|
||||||
|
|
||||||
|
@@ -347,15 +347,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
|
|||||||
*/
|
*/
|
||||||
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
|
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Wait for all pending messages on a given topic to be processed.
|
|
||||||
* \param topic Topic to await pending messages on.
|
|
||||||
* \return 0 on success.
|
|
||||||
* \return Non-zero on error.
|
|
||||||
* \since 12
|
|
||||||
*/
|
|
||||||
int stasis_topic_wait(struct stasis_topic *topic);
|
|
||||||
|
|
||||||
/*! @} */
|
/*! @} */
|
||||||
|
|
||||||
/*! @{ */
|
/*! @{ */
|
||||||
@@ -868,11 +859,6 @@ int stasis_cache_init(void);
|
|||||||
*/
|
*/
|
||||||
int stasis_config_init(void);
|
int stasis_config_init(void);
|
||||||
|
|
||||||
/*!
|
|
||||||
* \internal
|
|
||||||
*/
|
|
||||||
int stasis_wait_init(void);
|
|
||||||
|
|
||||||
/*! @} */
|
/*! @} */
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
|
@@ -194,14 +194,12 @@ struct stasis_cache *ast_endpoint_cache(void);
|
|||||||
*
|
*
|
||||||
* \param tech Name of the endpoint's technology.
|
* \param tech Name of the endpoint's technology.
|
||||||
* \param resource Resource name of the endpoint.
|
* \param resource Resource name of the endpoint.
|
||||||
* \param guaranteed Whether to require all pending messages to have been processed or not.
|
|
||||||
* \return Snapshot of the endpoint with the given name.
|
* \return Snapshot of the endpoint with the given name.
|
||||||
* \return \c NULL if endpoint is not found, or on error.
|
* \return \c NULL if endpoint is not found, or on error.
|
||||||
* \since 12
|
* \since 12
|
||||||
*/
|
*/
|
||||||
struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
|
struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
|
||||||
const char *resource,
|
const char *resource
|
||||||
unsigned int guaranteed
|
|
||||||
);
|
);
|
||||||
|
|
||||||
/*! @} */
|
/*! @} */
|
||||||
|
@@ -814,11 +814,6 @@ int stasis_init(void)
|
|||||||
/* Be sure the types are cleaned up after the message bus */
|
/* Be sure the types are cleaned up after the message bus */
|
||||||
ast_register_cleanup(stasis_cleanup);
|
ast_register_cleanup(stasis_cleanup);
|
||||||
|
|
||||||
if (stasis_wait_init() != 0) {
|
|
||||||
ast_log(LOG_ERROR, "Stasis initialization failed\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
cache_init = stasis_cache_init();
|
cache_init = stasis_cache_init();
|
||||||
if (cache_init != 0) {
|
if (cache_init != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@@ -187,7 +187,7 @@ void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_mess
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
|
struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
|
||||||
const char *name, unsigned int guaranteed)
|
const char *name)
|
||||||
{
|
{
|
||||||
RAII_VAR(char *, id, NULL, ast_free);
|
RAII_VAR(char *, id, NULL, ast_free);
|
||||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
||||||
@@ -198,10 +198,6 @@ struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (guaranteed) {
|
|
||||||
stasis_topic_wait(ast_endpoint_topic_all_cached());
|
|
||||||
}
|
|
||||||
|
|
||||||
msg = stasis_cache_get(ast_endpoint_cache(),
|
msg = stasis_cache_get(ast_endpoint_cache(),
|
||||||
ast_endpoint_snapshot_type(), id);
|
ast_endpoint_snapshot_type(), id);
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
|
@@ -1,133 +0,0 @@
|
|||||||
/*
|
|
||||||
* Asterisk -- An open source telephony toolkit.
|
|
||||||
*
|
|
||||||
* Copyright (C) 2013, Digium, Inc.
|
|
||||||
*
|
|
||||||
* Joshua Colp <jcolp@digium.com>
|
|
||||||
*
|
|
||||||
* See http://www.asterisk.org for more information about
|
|
||||||
* the Asterisk project. Please do not directly contact
|
|
||||||
* any of the maintainers of this project for assistance;
|
|
||||||
* the project provides a web site, mailing lists and IRC
|
|
||||||
* channels for your use.
|
|
||||||
*
|
|
||||||
* This program is free software, distributed under the terms of
|
|
||||||
* the GNU General Public License Version 2. See the LICENSE file
|
|
||||||
* at the top of the source tree.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*! \file
|
|
||||||
*
|
|
||||||
* \brief Wait support for Stasis topics.
|
|
||||||
*
|
|
||||||
* \author Joshua Colp <jcolp@digium.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*** MODULEINFO
|
|
||||||
<support_level>core</support_level>
|
|
||||||
***/
|
|
||||||
|
|
||||||
#include "asterisk.h"
|
|
||||||
|
|
||||||
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
|
||||||
|
|
||||||
#include "asterisk/astobj2.h"
|
|
||||||
#include "asterisk/stasis.h"
|
|
||||||
|
|
||||||
static struct stasis_message_type *cache_guarantee_type(void);
|
|
||||||
STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
|
|
||||||
|
|
||||||
/*! \internal */
|
|
||||||
struct caching_guarantee {
|
|
||||||
ast_mutex_t lock;
|
|
||||||
ast_cond_t cond;
|
|
||||||
unsigned int done:1;
|
|
||||||
};
|
|
||||||
|
|
||||||
static void caching_guarantee_dtor(void *obj)
|
|
||||||
{
|
|
||||||
struct caching_guarantee *guarantee = obj;
|
|
||||||
|
|
||||||
ast_assert(guarantee->done == 1);
|
|
||||||
|
|
||||||
ast_mutex_destroy(&guarantee->lock);
|
|
||||||
ast_cond_destroy(&guarantee->cond);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void guarantee_handler(void *data, struct stasis_subscription *sub,
|
|
||||||
struct stasis_message *message)
|
|
||||||
{
|
|
||||||
/* Wait for our particular message */
|
|
||||||
if (data == message) {
|
|
||||||
struct caching_guarantee *guarantee;
|
|
||||||
ast_assert(cache_guarantee_type() == stasis_message_type(message));
|
|
||||||
guarantee = stasis_message_data(message);
|
|
||||||
|
|
||||||
ast_mutex_lock(&guarantee->lock);
|
|
||||||
guarantee->done = 1;
|
|
||||||
ast_cond_signal(&guarantee->cond);
|
|
||||||
ast_mutex_unlock(&guarantee->lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct stasis_message *caching_guarantee_create(void)
|
|
||||||
{
|
|
||||||
RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
|
|
||||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
|
||||||
|
|
||||||
if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ast_mutex_init(&guarantee->lock);
|
|
||||||
ast_cond_init(&guarantee->cond, NULL);
|
|
||||||
|
|
||||||
if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ao2_ref(msg, +1);
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
int stasis_topic_wait(struct stasis_topic *topic)
|
|
||||||
{
|
|
||||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
|
||||||
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
|
|
||||||
struct caching_guarantee *guarantee;
|
|
||||||
|
|
||||||
msg = caching_guarantee_create();
|
|
||||||
if (!msg) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
sub = stasis_subscribe(topic, guarantee_handler, msg);
|
|
||||||
if (!sub) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
guarantee = stasis_message_data(msg);
|
|
||||||
|
|
||||||
ast_mutex_lock(&guarantee->lock);
|
|
||||||
stasis_publish(topic, msg);
|
|
||||||
while (!guarantee->done) {
|
|
||||||
ast_cond_wait(&guarantee->cond, &guarantee->lock);
|
|
||||||
}
|
|
||||||
ast_mutex_unlock(&guarantee->lock);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void wait_cleanup(void)
|
|
||||||
{
|
|
||||||
STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
int stasis_wait_init(void)
|
|
||||||
{
|
|
||||||
ast_register_cleanup(wait_cleanup);
|
|
||||||
|
|
||||||
if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
@@ -140,7 +140,7 @@ void ast_ari_get_endpoint(struct ast_variable *headers,
|
|||||||
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
|
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
|
||||||
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
|
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
|
||||||
|
|
||||||
snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource, 0);
|
snapshot = ast_endpoint_latest_snapshot(args->tech, args->resource);
|
||||||
if (!snapshot) {
|
if (!snapshot) {
|
||||||
ast_ari_response_error(response, 404, "Not Found",
|
ast_ari_response_error(response, 404, "Not Found",
|
||||||
"Endpoint not found");
|
"Endpoint not found");
|
||||||
|
@@ -254,7 +254,6 @@ static void do_sleep(void)
|
|||||||
ast_hangup((channel)); \
|
ast_hangup((channel)); \
|
||||||
HANGUP_EVENT(channel, cause, dialstatus); \
|
HANGUP_EVENT(channel, cause, dialstatus); \
|
||||||
APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \
|
APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \
|
||||||
stasis_topic_wait(ast_channel_topic_all_cached()); \
|
|
||||||
ao2_cleanup(stasis_cache_get(ast_channel_cache(), \
|
ao2_cleanup(stasis_cache_get(ast_channel_cache(), \
|
||||||
ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \
|
ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \
|
||||||
ao2_cleanup(channel); \
|
ao2_cleanup(channel); \
|
||||||
|
Reference in New Issue
Block a user