mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-06 12:36:58 +00:00
stasis_cache_pattern: Backport to 13
Somehow stasis_cache_pattern got out of sync between 13 and master and it was causing duplicate channel message issues in 13 when related to a specific endpoint. I.E. from statsd, 'endpoints.PJSIP.1174.channels 0|g' was being emitted twice. Backporting stasis_cache_pattern from master to 13 solved the issue and running the unit and testsuite tests confirmed that no new ones were created. ASTERISK-25317 #close Change-Id: Ia8707462f62d15eed14541c37f332a7bbbceb548
This commit is contained in:
@@ -121,9 +121,12 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
|||||||
const char *name);
|
const char *name);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Create the 'one' side of the cache pattern.
|
* \brief Create a sink in the cache pattern
|
||||||
*
|
*
|
||||||
* Create the 'one' but do not automatically forward.
|
* Create the 'one' but do not automatically forward to the all's topic.
|
||||||
|
* This is useful when aggregating other topic's messages created with
|
||||||
|
* \c stasis_cp_single_create in another caching topic without replicating
|
||||||
|
* those messages in the all's topics.
|
||||||
*
|
*
|
||||||
* Dispose of using stasis_cp_single_unsubscribe().
|
* Dispose of using stasis_cp_single_unsubscribe().
|
||||||
*
|
*
|
||||||
@@ -131,21 +134,9 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
|||||||
* \param name Base name for the topics.
|
* \param name Base name for the topics.
|
||||||
* \return One side instance
|
* \return One side instance
|
||||||
*/
|
*/
|
||||||
struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all,
|
struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all,
|
||||||
const char *name);
|
const char *name);
|
||||||
|
|
||||||
/*!
|
|
||||||
* \brief Set up a topic and topic cache forward.
|
|
||||||
*
|
|
||||||
* Forward 'from' to 'to'.
|
|
||||||
*
|
|
||||||
* \param from Source 'one' side instance.
|
|
||||||
* \param to Destination 'one' side instance.
|
|
||||||
* \retval 0 Success
|
|
||||||
* \retval -1 Failure
|
|
||||||
*/
|
|
||||||
int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to);
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Stops caching and forwarding messages.
|
* \brief Stops caching and forwarding messages.
|
||||||
*
|
*
|
||||||
|
@@ -74,6 +74,8 @@ struct ast_endpoint {
|
|||||||
struct stasis_message_router *router;
|
struct stasis_message_router *router;
|
||||||
/*! ast_str_container of channels associated with this endpoint */
|
/*! ast_str_container of channels associated with this endpoint */
|
||||||
struct ao2_container *channel_ids;
|
struct ao2_container *channel_ids;
|
||||||
|
/*! Forwarding subscription from an endpoint to its tech endpoint */
|
||||||
|
struct stasis_forward *tech_forward;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int endpoint_hash(const void *obj, int flags)
|
static int endpoint_hash(const void *obj, int flags)
|
||||||
@@ -303,7 +305,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
|
|||||||
|
|
||||||
if (!ast_strlen_zero(resource)) {
|
if (!ast_strlen_zero(resource)) {
|
||||||
|
|
||||||
endpoint->topics = stasis_cp_single_create_only(ast_endpoint_cache_all(),
|
endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
|
||||||
endpoint->id);
|
endpoint->id);
|
||||||
if (!endpoint->topics) {
|
if (!endpoint->topics) {
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -322,14 +324,13 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (stasis_cp_single_forward(endpoint->topics, tech_endpoint->topics)) {
|
endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
|
||||||
return NULL;
|
stasis_cp_single_topic(tech_endpoint->topics));
|
||||||
}
|
|
||||||
|
|
||||||
endpoint_publish_snapshot(endpoint);
|
endpoint_publish_snapshot(endpoint);
|
||||||
ao2_link(endpoints, endpoint);
|
ao2_link(endpoints, endpoint);
|
||||||
} else {
|
} else {
|
||||||
endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
|
endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
|
||||||
endpoint->id);
|
endpoint->id);
|
||||||
if (!endpoint->topics) {
|
if (!endpoint->topics) {
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -382,6 +383,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ao2_unlink(endpoints, endpoint);
|
ao2_unlink(endpoints, endpoint);
|
||||||
|
endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
|
||||||
|
|
||||||
clear_msg = create_endpoint_snapshot_message(endpoint);
|
clear_msg = create_endpoint_snapshot_message(endpoint);
|
||||||
if (clear_msg) {
|
if (clear_msg) {
|
||||||
|
@@ -138,7 +138,7 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
|||||||
{
|
{
|
||||||
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
|
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
|
||||||
|
|
||||||
one = stasis_cp_single_create_only(all, name);
|
one = stasis_cp_sink_create(all, name);
|
||||||
if (!one) {
|
if (!one) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -157,7 +157,7 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
|||||||
return one;
|
return one;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all,
|
struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all,
|
||||||
const char *name)
|
const char *name)
|
||||||
{
|
{
|
||||||
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
|
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
|
||||||
@@ -180,23 +180,6 @@ struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all,
|
|||||||
return one;
|
return one;
|
||||||
}
|
}
|
||||||
|
|
||||||
int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to)
|
|
||||||
{
|
|
||||||
from->forward_topic_to_all = stasis_forward_all(from->topic, to->topic);
|
|
||||||
if (!from->forward_topic_to_all) {
|
|
||||||
return -1;;
|
|
||||||
}
|
|
||||||
|
|
||||||
from->forward_cached_to_all = stasis_forward_all(
|
|
||||||
stasis_caching_get_topic(from->topic_cached),
|
|
||||||
stasis_caching_get_topic(to->topic_cached));
|
|
||||||
if (!from->forward_cached_to_all) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
|
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
|
||||||
{
|
{
|
||||||
if (!one) {
|
if (!one) {
|
||||||
|
Reference in New Issue
Block a user