threadpool, res_pjsip: Add serializer group shutdown API calls.

A module trying to unload needs to wait for all serializers it creates and
uses to complete processing before unloading.

ASTERISK-24907
Reported by: Kevin Harwell

Change-Id: I8c80b90f2f82754e8dbb02ddf3c9121e5e966059
This commit is contained in:
Richard Mudgett
2015-06-19 16:16:17 -05:00
parent 4c133d81cd
commit af4ae3095e
4 changed files with 207 additions and 11 deletions

View File

@@ -1126,18 +1126,126 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
ast_cond_signal(&worker->cond);
}
/*! Serializer group shutdown control object. */
struct ast_serializer_shutdown_group {
/*! Shutdown thread waits on this conditional. */
ast_cond_t cond;
/*! Count of serializers needing to shutdown. */
int count;
};
static void serializer_shutdown_group_dtor(void *vdoomed)
{
struct ast_serializer_shutdown_group *doomed = vdoomed;
ast_cond_destroy(&doomed->cond);
}
struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
{
struct ast_serializer_shutdown_group *shutdown_group;
shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
if (!shutdown_group) {
return NULL;
}
ast_cond_init(&shutdown_group->cond, NULL);
return shutdown_group;
}
int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
{
int remaining;
ast_mutex_t *lock;
if (!shutdown_group) {
return 0;
}
lock = ao2_object_get_lockaddr(shutdown_group);
ast_assert(lock != NULL);
ao2_lock(shutdown_group);
if (timeout) {
struct timeval start;
struct timespec end;
start = ast_tvnow();
end.tv_sec = start.tv_sec + timeout;
end.tv_nsec = start.tv_usec * 1000;
while (shutdown_group->count) {
if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
/* Error or timed out waiting for the count to reach zero. */
break;
}
}
} else {
while (shutdown_group->count) {
if (ast_cond_wait(&shutdown_group->cond, lock)) {
/* Error */
break;
}
}
}
remaining = shutdown_group->count;
ao2_unlock(shutdown_group);
return remaining;
}
/*!
* \internal
* \brief Increment the number of serializer members in the group.
* \since 13.5.0
*
* \param shutdown_group Group shutdown controller.
*
* \return Nothing
*/
static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
{
ao2_lock(shutdown_group);
++shutdown_group->count;
ao2_unlock(shutdown_group);
}
/*!
* \internal
* \brief Decrement the number of serializer members in the group.
* \since 13.5.0
*
* \param shutdown_group Group shutdown controller.
*
* \return Nothing
*/
static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
{
ao2_lock(shutdown_group);
--shutdown_group->count;
if (!shutdown_group->count) {
ast_cond_signal(&shutdown_group->cond);
}
ao2_unlock(shutdown_group);
}
struct serializer {
/*! Threadpool the serializer will use to process the jobs. */
struct ast_threadpool *pool;
/*! Which group will wait for this serializer to shutdown. */
struct ast_serializer_shutdown_group *shutdown_group;
};
static void serializer_dtor(void *obj)
{
struct serializer *ser = obj;
ao2_cleanup(ser->pool);
ser->pool = NULL;
ao2_cleanup(ser->shutdown_group);
ser->shutdown_group = NULL;
}
static struct serializer *serializer_create(struct ast_threadpool *pool)
static struct serializer *serializer_create(struct ast_threadpool *pool,
struct ast_serializer_shutdown_group *shutdown_group)
{
struct serializer *ser;
@@ -1147,6 +1255,7 @@ static struct serializer *serializer_create(struct ast_threadpool *pool)
}
ao2_ref(pool, +1);
ser->pool = pool;
ser->shutdown_group = ao2_bump(shutdown_group);
return ser;
}
@@ -1187,6 +1296,10 @@ static int serializer_start(struct ast_taskprocessor_listener *listener)
static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
{
struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
if (ser->shutdown_group) {
serializer_shutdown_group_dec(ser->shutdown_group);
}
ao2_cleanup(ser);
}
@@ -1201,27 +1314,35 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
return ast_threadstorage_get_ptr(&current_serializer);
}
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
{
RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
struct ast_taskprocessor *tps = NULL;
struct serializer *ser;
struct ast_taskprocessor_listener *listener;
struct ast_taskprocessor *tps;
ser = serializer_create(pool);
ser = serializer_create(pool, shutdown_group);
if (!ser) {
return NULL;
}
listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
if (!listener) {
ao2_ref(ser, -1);
return NULL;
}
ser = NULL; /* ownership transferred to listener */
/* ser ref transferred to listener */
tps = ast_taskprocessor_create_with_listener(name, listener);
if (!tps) {
return NULL;
if (tps && shutdown_group) {
serializer_shutdown_group_inc(shutdown_group);
}
ao2_ref(listener, -1);
return tps;
}
struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
{
return ast_threadpool_serializer_group(name, pool, NULL);
}