mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-06 12:36:58 +00:00
Merge "stasis: Add setting subscription congestion levels."
This commit is contained in:
@@ -590,6 +590,20 @@ struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
|
|||||||
struct stasis_subscription *stasis_unsubscribe(
|
struct stasis_subscription *stasis_unsubscribe(
|
||||||
struct stasis_subscription *subscription);
|
struct stasis_subscription *subscription);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Set the high and low alert water marks of the stasis subscription.
|
||||||
|
* \since 13.10.0
|
||||||
|
*
|
||||||
|
* \param subscription Pointer to a stasis subscription
|
||||||
|
* \param low_water New queue low water mark. (-1 to set as 90% of high_water)
|
||||||
|
* \param high_water New queue high water mark.
|
||||||
|
*
|
||||||
|
* \retval 0 on success.
|
||||||
|
* \retval -1 on error (water marks not changed).
|
||||||
|
*/
|
||||||
|
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
|
||||||
|
long low_water, long high_water);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Block until the last message is processed on a subscription.
|
* \brief Block until the last message is processed on a subscription.
|
||||||
*
|
*
|
||||||
|
@@ -126,6 +126,20 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
|
|||||||
void stasis_message_router_publish_sync(struct stasis_message_router *router,
|
void stasis_message_router_publish_sync(struct stasis_message_router *router,
|
||||||
struct stasis_message *message);
|
struct stasis_message *message);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* \brief Set the high and low alert water marks of the stasis message router.
|
||||||
|
* \since 13.10.0
|
||||||
|
*
|
||||||
|
* \param router Pointer to a stasis message router
|
||||||
|
* \param low_water New queue low water mark. (-1 to set as 90% of high_water)
|
||||||
|
* \param high_water New queue high water mark.
|
||||||
|
*
|
||||||
|
* \retval 0 on success.
|
||||||
|
* \retval -1 on error (water marks not changed).
|
||||||
|
*/
|
||||||
|
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
|
||||||
|
long low_water, long high_water);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Add a route to a message router.
|
* \brief Add a route to a message router.
|
||||||
*
|
*
|
||||||
|
@@ -71,6 +71,7 @@ ASTERISK_REGISTER_FILE()
|
|||||||
#include "asterisk/stasis_bridges.h"
|
#include "asterisk/stasis_bridges.h"
|
||||||
#include "asterisk/stasis_message_router.h"
|
#include "asterisk/stasis_message_router.h"
|
||||||
#include "asterisk/astobj2.h"
|
#include "asterisk/astobj2.h"
|
||||||
|
#include "asterisk/taskprocessor.h"
|
||||||
|
|
||||||
/*** DOCUMENTATION
|
/*** DOCUMENTATION
|
||||||
<configInfo name="cdr" language="en_US">
|
<configInfo name="cdr" language="en_US">
|
||||||
@@ -4219,6 +4220,8 @@ int ast_cdr_engine_init(void)
|
|||||||
if (!stasis_router) {
|
if (!stasis_router) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
stasis_message_router_set_congestion_limits(stasis_router, -1,
|
||||||
|
10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
|
||||||
|
|
||||||
if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
|
if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@@ -59,6 +59,7 @@ ASTERISK_REGISTER_FILE()
|
|||||||
#include "asterisk/parking.h"
|
#include "asterisk/parking.h"
|
||||||
#include "asterisk/pickup.h"
|
#include "asterisk/pickup.h"
|
||||||
#include "asterisk/core_local.h"
|
#include "asterisk/core_local.h"
|
||||||
|
#include "asterisk/taskprocessor.h"
|
||||||
|
|
||||||
/*** DOCUMENTATION
|
/*** DOCUMENTATION
|
||||||
<configInfo name="cel" language="en_US">
|
<configInfo name="cel" language="en_US">
|
||||||
@@ -1575,6 +1576,8 @@ static int create_routes(void)
|
|||||||
if (!cel_state_router) {
|
if (!cel_state_router) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
stasis_message_router_set_congestion_limits(cel_state_router, -1,
|
||||||
|
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
|
||||||
|
|
||||||
ret |= stasis_message_router_add(cel_state_router,
|
ret |= stasis_message_router_add(cel_state_router,
|
||||||
stasis_cache_update_type(),
|
stasis_cache_update_type(),
|
||||||
|
@@ -100,6 +100,7 @@ ASTERISK_REGISTER_FILE()
|
|||||||
#include "asterisk/rtp_engine.h"
|
#include "asterisk/rtp_engine.h"
|
||||||
#include "asterisk/format_cache.h"
|
#include "asterisk/format_cache.h"
|
||||||
#include "asterisk/translate.h"
|
#include "asterisk/translate.h"
|
||||||
|
#include "asterisk/taskprocessor.h"
|
||||||
|
|
||||||
/*** DOCUMENTATION
|
/*** DOCUMENTATION
|
||||||
<manager name="Ping" language="en_US">
|
<manager name="Ping" language="en_US">
|
||||||
@@ -8692,6 +8693,8 @@ static int manager_subscriptions_init(void)
|
|||||||
if (!stasis_router) {
|
if (!stasis_router) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
stasis_message_router_set_congestion_limits(stasis_router, -1,
|
||||||
|
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
|
||||||
|
|
||||||
res |= stasis_message_router_set_default(stasis_router,
|
res |= stasis_message_router_set_default(stasis_router,
|
||||||
manager_default_msg_cb, NULL);
|
manager_default_msg_cb, NULL);
|
||||||
|
@@ -564,6 +564,18 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
|
||||||
|
long low_water, long high_water)
|
||||||
|
{
|
||||||
|
int res = -1;
|
||||||
|
|
||||||
|
if (subscription) {
|
||||||
|
res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
|
||||||
|
low_water, high_water);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
void stasis_subscription_join(struct stasis_subscription *subscription)
|
void stasis_subscription_join(struct stasis_subscription *subscription)
|
||||||
{
|
{
|
||||||
if (subscription) {
|
if (subscription) {
|
||||||
|
@@ -289,6 +289,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router,
|
|||||||
ao2_cleanup(router);
|
ao2_cleanup(router);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
|
||||||
|
long low_water, long high_water)
|
||||||
|
{
|
||||||
|
int res = -1;
|
||||||
|
|
||||||
|
if (router) {
|
||||||
|
res = stasis_subscription_set_congestion_limits(router->subscription,
|
||||||
|
low_water, high_water);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
int stasis_message_router_add(struct stasis_message_router *router,
|
int stasis_message_router_add(struct stasis_message_router *router,
|
||||||
struct stasis_message_type *message_type,
|
struct stasis_message_type *message_type,
|
||||||
stasis_subscription_cb callback, void *data)
|
stasis_subscription_cb callback, void *data)
|
||||||
|
Reference in New Issue
Block a user