mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-05 12:16:00 +00:00
Merge "threadpool, res_pjsip: Add serializer group shutdown API calls."
This commit is contained in:
@@ -1102,6 +1102,23 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void);
|
||||
*/
|
||||
struct ast_taskprocessor *ast_sip_create_serializer(void);
|
||||
|
||||
struct ast_serializer_shutdown_group;
|
||||
|
||||
/*!
|
||||
* \brief Create a new serializer for SIP tasks
|
||||
* \since 13.5.0
|
||||
*
|
||||
* See \ref ast_threadpool_serializer for more information on serializers.
|
||||
* SIP creates serializers so that tasks operating on similar data will run
|
||||
* in sequence.
|
||||
*
|
||||
* \param shutdown_group Group shutdown controller. (NULL if no group association)
|
||||
*
|
||||
* \retval NULL Failure
|
||||
* \retval non-NULL Newly-created serializer
|
||||
*/
|
||||
struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group);
|
||||
|
||||
/*!
|
||||
* \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
|
||||
*
|
||||
|
@@ -195,6 +195,28 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo
|
||||
*/
|
||||
void ast_threadpool_shutdown(struct ast_threadpool *pool);
|
||||
|
||||
struct ast_serializer_shutdown_group;
|
||||
|
||||
/*!
|
||||
* \brief Create a serializer group shutdown control object.
|
||||
* \since 13.5.0
|
||||
*
|
||||
* \return ao2 object to control shutdown of a serializer group.
|
||||
*/
|
||||
struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void);
|
||||
|
||||
/*!
|
||||
* \brief Wait for the serializers in the group to shutdown with timeout.
|
||||
* \since 13.5.0
|
||||
*
|
||||
* \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL)
|
||||
* \param timeout Number of seconds to wait for the serializers in the group to shutdown.
|
||||
* Zero if the timeout is disabled.
|
||||
*
|
||||
* \return Number of seriaizers that did not get shutdown within the timeout.
|
||||
*/
|
||||
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout);
|
||||
|
||||
/*!
|
||||
* \brief Get the threadpool serializer currently associated with this thread.
|
||||
* \since 14.0.0
|
||||
@@ -234,9 +256,40 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void);
|
||||
*
|
||||
* \param name Name of the serializer. (must be unique)
|
||||
* \param pool \ref ast_threadpool for execution.
|
||||
*
|
||||
* \return \ref ast_taskprocessor for enqueuing work.
|
||||
* \return \c NULL on error.
|
||||
*/
|
||||
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool);
|
||||
|
||||
/*!
|
||||
* \brief Serialized execution of tasks within a \ref ast_threadpool.
|
||||
* \since 13.5.0
|
||||
*
|
||||
* A \ref ast_taskprocessor with the same contract as a default taskprocessor
|
||||
* (tasks execute serially) except instead of executing out of a dedicated
|
||||
* thread, execution occurs in a thread from a \ref ast_threadpool. Think of it
|
||||
* as a lightweight thread.
|
||||
*
|
||||
* While it guarantees that each task will complete before executing the next,
|
||||
* there is no guarantee as to which thread from the \c pool individual tasks
|
||||
* will execute. This normally only matters if your code relys on thread
|
||||
* specific information, such as thread locals.
|
||||
*
|
||||
* Use ast_taskprocessor_unreference() to dispose of the returned \ref
|
||||
* ast_taskprocessor.
|
||||
*
|
||||
* Only a single taskprocessor with a given name may exist. This function will fail
|
||||
* if a taskprocessor with the given name already exists.
|
||||
*
|
||||
* \param name Name of the serializer. (must be unique)
|
||||
* \param pool \ref ast_threadpool for execution.
|
||||
* \param shutdown_group Group shutdown controller. (NULL if no group association)
|
||||
*
|
||||
* \return \ref ast_taskprocessor for enqueuing work.
|
||||
* \return \c NULL on error.
|
||||
*/
|
||||
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
|
||||
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
|
||||
|
||||
#endif /* ASTERISK_THREADPOOL_H */
|
||||
|
@@ -1126,18 +1126,126 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
|
||||
ast_cond_signal(&worker->cond);
|
||||
}
|
||||
|
||||
/*! Serializer group shutdown control object. */
|
||||
struct ast_serializer_shutdown_group {
|
||||
/*! Shutdown thread waits on this conditional. */
|
||||
ast_cond_t cond;
|
||||
/*! Count of serializers needing to shutdown. */
|
||||
int count;
|
||||
};
|
||||
|
||||
static void serializer_shutdown_group_dtor(void *vdoomed)
|
||||
{
|
||||
struct ast_serializer_shutdown_group *doomed = vdoomed;
|
||||
|
||||
ast_cond_destroy(&doomed->cond);
|
||||
}
|
||||
|
||||
struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
|
||||
{
|
||||
struct ast_serializer_shutdown_group *shutdown_group;
|
||||
|
||||
shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
|
||||
if (!shutdown_group) {
|
||||
return NULL;
|
||||
}
|
||||
ast_cond_init(&shutdown_group->cond, NULL);
|
||||
return shutdown_group;
|
||||
}
|
||||
|
||||
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
|
||||
{
|
||||
int remaining;
|
||||
ast_mutex_t *lock;
|
||||
|
||||
if (!shutdown_group) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
lock = ao2_object_get_lockaddr(shutdown_group);
|
||||
ast_assert(lock != NULL);
|
||||
|
||||
ao2_lock(shutdown_group);
|
||||
if (timeout) {
|
||||
struct timeval start;
|
||||
struct timespec end;
|
||||
|
||||
start = ast_tvnow();
|
||||
end.tv_sec = start.tv_sec + timeout;
|
||||
end.tv_nsec = start.tv_usec * 1000;
|
||||
while (shutdown_group->count) {
|
||||
if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
|
||||
/* Error or timed out waiting for the count to reach zero. */
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while (shutdown_group->count) {
|
||||
if (ast_cond_wait(&shutdown_group->cond, lock)) {
|
||||
/* Error */
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
remaining = shutdown_group->count;
|
||||
ao2_unlock(shutdown_group);
|
||||
return remaining;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Increment the number of serializer members in the group.
|
||||
* \since 13.5.0
|
||||
*
|
||||
* \param shutdown_group Group shutdown controller.
|
||||
*
|
||||
* \return Nothing
|
||||
*/
|
||||
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
|
||||
{
|
||||
ao2_lock(shutdown_group);
|
||||
++shutdown_group->count;
|
||||
ao2_unlock(shutdown_group);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Decrement the number of serializer members in the group.
|
||||
* \since 13.5.0
|
||||
*
|
||||
* \param shutdown_group Group shutdown controller.
|
||||
*
|
||||
* \return Nothing
|
||||
*/
|
||||
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
|
||||
{
|
||||
ao2_lock(shutdown_group);
|
||||
--shutdown_group->count;
|
||||
if (!shutdown_group->count) {
|
||||
ast_cond_signal(&shutdown_group->cond);
|
||||
}
|
||||
ao2_unlock(shutdown_group);
|
||||
}
|
||||
|
||||
struct serializer {
|
||||
/*! Threadpool the serializer will use to process the jobs. */
|
||||
struct ast_threadpool *pool;
|
||||
/*! Which group will wait for this serializer to shutdown. */
|
||||
struct ast_serializer_shutdown_group *shutdown_group;
|
||||
};
|
||||
|
||||
static void serializer_dtor(void *obj)
|
||||
{
|
||||
struct serializer *ser = obj;
|
||||
|
||||
ao2_cleanup(ser->pool);
|
||||
ser->pool = NULL;
|
||||
ao2_cleanup(ser->shutdown_group);
|
||||
ser->shutdown_group = NULL;
|
||||
}
|
||||
|
||||
static struct serializer *serializer_create(struct ast_threadpool *pool)
|
||||
static struct serializer *serializer_create(struct ast_threadpool *pool,
|
||||
struct ast_serializer_shutdown_group *shutdown_group)
|
||||
{
|
||||
struct serializer *ser;
|
||||
|
||||
@@ -1147,6 +1255,7 @@ static struct serializer *serializer_create(struct ast_threadpool *pool)
|
||||
}
|
||||
ao2_ref(pool, +1);
|
||||
ser->pool = pool;
|
||||
ser->shutdown_group = ao2_bump(shutdown_group);
|
||||
return ser;
|
||||
}
|
||||
|
||||
@@ -1187,6 +1296,10 @@ static int serializer_start(struct ast_taskprocessor_listener *listener)
|
||||
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
|
||||
{
|
||||
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
|
||||
|
||||
if (ser->shutdown_group) {
|
||||
serializer_shutdown_group_dec(ser->shutdown_group);
|
||||
}
|
||||
ao2_cleanup(ser);
|
||||
}
|
||||
|
||||
@@ -1201,27 +1314,35 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
|
||||
return ast_threadstorage_get_ptr(¤t_serializer);
|
||||
}
|
||||
|
||||
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
|
||||
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
|
||||
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
|
||||
{
|
||||
RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
|
||||
struct ast_taskprocessor *tps = NULL;
|
||||
struct serializer *ser;
|
||||
struct ast_taskprocessor_listener *listener;
|
||||
struct ast_taskprocessor *tps;
|
||||
|
||||
ser = serializer_create(pool);
|
||||
ser = serializer_create(pool, shutdown_group);
|
||||
if (!ser) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
|
||||
if (!listener) {
|
||||
ao2_ref(ser, -1);
|
||||
return NULL;
|
||||
}
|
||||
ser = NULL; /* ownership transferred to listener */
|
||||
/* ser ref transferred to listener */
|
||||
|
||||
tps = ast_taskprocessor_create_with_listener(name, listener);
|
||||
if (!tps) {
|
||||
return NULL;
|
||||
if (tps && shutdown_group) {
|
||||
serializer_shutdown_group_inc(shutdown_group);
|
||||
}
|
||||
|
||||
ao2_ref(listener, -1);
|
||||
return tps;
|
||||
}
|
||||
|
||||
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
|
||||
{
|
||||
return ast_threadpool_serializer_group(name, pool, NULL);
|
||||
}
|
||||
|
@@ -3327,20 +3327,25 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text)
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct ast_taskprocessor *ast_sip_create_serializer(void)
|
||||
struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group)
|
||||
{
|
||||
struct ast_taskprocessor *serializer;
|
||||
char name[AST_UUID_STR_LEN];
|
||||
|
||||
ast_uuid_generate_str(name, sizeof(name));
|
||||
|
||||
serializer = ast_threadpool_serializer(name, sip_threadpool);
|
||||
serializer = ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group);
|
||||
if (!serializer) {
|
||||
return NULL;
|
||||
}
|
||||
return serializer;
|
||||
}
|
||||
|
||||
struct ast_taskprocessor *ast_sip_create_serializer(void)
|
||||
{
|
||||
return ast_sip_create_serializer_group(NULL);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Shutdown the serializers in the default pool.
|
||||
|
Reference in New Issue
Block a user