mirror of
https://github.com/asterisk/asterisk.git
synced 2025-09-06 04:30:28 +00:00
Reorganize code and change behavior of ast_taskprocessor_execute() when taskprocessor is shutting down.
Moved code around to be easier to follow. ast_taskprocessor_execute() will now return 0 if the taskprocessor is being shut down. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376499 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
@@ -22,28 +22,33 @@
|
|||||||
*
|
*
|
||||||
* \author Dwayne M. Hubbard <dhubbard@digium.com>
|
* \author Dwayne M. Hubbard <dhubbard@digium.com>
|
||||||
*
|
*
|
||||||
* \note A taskprocessor is a named singleton containing a task queue that serializes tasks pushed
|
* \note A taskprocessor is a named singleton containing a task queue that
|
||||||
* into it by [a] module(s) that reference the taskprocessor. A taskprocessor is created the first
|
* serializes tasks pushed into it by [a] module(s) that reference the taskprocessor.
|
||||||
* time its name is requested via the ast_taskprocessor_get() function and destroyed when the
|
* A taskprocessor is created the first time its name is requested via the
|
||||||
* taskprocessor reference count reaches zero. A taskprocessor also contains an accompanying
|
* ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener()
|
||||||
* listener that is told when changes in the task queue occur.
|
* function and destroyed when the taskprocessor reference count reaches zero. A
|
||||||
|
* taskprocessor also contains an accompanying listener that is notified when changes
|
||||||
|
* in the task queue occur.
|
||||||
*
|
*
|
||||||
* A task is a wrapper around a task-handling function pointer and a data
|
* A task is a wrapper around a task-handling function pointer and a data
|
||||||
* pointer. A task is pushed into a taskprocessor queue using the
|
* pointer. A task is pushed into a taskprocessor queue using the
|
||||||
* ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the
|
* ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the
|
||||||
* taskprocessor after the task handling function returns. A module releases its reference to a
|
* taskprocessor after the task handling function returns. A module releases its
|
||||||
* taskprocessor using the ast_taskprocessor_unreference() function which may result in the
|
* reference to a taskprocessor using the ast_taskprocessor_unreference() function which
|
||||||
* destruction of the taskprocessor if the taskprocessor's reference count reaches zero. Tasks waiting
|
* may result in the destruction of the taskprocessor if the taskprocessor's reference
|
||||||
* to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero
|
* count reaches zero. When the taskprocessor's reference count reaches zero, its
|
||||||
* will be purged and released from the taskprocessor queue without being processed.
|
* listener's shutdown() callback will be called. Any further attempts to execute tasks
|
||||||
|
* will be denied.
|
||||||
*
|
*
|
||||||
* The taskprocessor listener has the flexibility of doling out tasks to best fit the module's
|
* The taskprocessor listener has the flexibility of doling out tasks to best fit the
|
||||||
* needs. For instance, a taskprocessor listener may have a single dispatch thread that handles
|
* module's needs. For instance, a taskprocessor listener may have a single dispatch
|
||||||
* all tasks, or it may dispatch tasks to a thread pool.
|
* thread that handles all tasks, or it may dispatch tasks to a thread pool.
|
||||||
*
|
*
|
||||||
* There is a default taskprocessor listener that will be used if a taskprocessor is created without
|
* There is a default taskprocessor listener that will be used if a taskprocessor is
|
||||||
* a listener. This default listener runs tasks sequentially in a single thread. The listener will
|
* created without any explicit listener. This default listener runs tasks sequentially
|
||||||
* execute tasks as long as there are tasks to be processed.
|
* in a single thread. The listener will execute tasks as long as there are tasks to be
|
||||||
|
* processed. When the taskprocessor is shut down, the default listener will stop
|
||||||
|
* processing tasks and join its execution thread.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef __AST_TASKPROCESSOR_H__
|
#ifndef __AST_TASKPROCESSOR_H__
|
||||||
|
@@ -76,6 +76,8 @@ struct ast_taskprocessor {
|
|||||||
/*! \brief Taskprocessor singleton list entry */
|
/*! \brief Taskprocessor singleton list entry */
|
||||||
AST_LIST_ENTRY(ast_taskprocessor) list;
|
AST_LIST_ENTRY(ast_taskprocessor) list;
|
||||||
struct ast_taskprocessor_listener *listener;
|
struct ast_taskprocessor_listener *listener;
|
||||||
|
/*! Indicates if the taskprocessor is in the process of shuting down */
|
||||||
|
unsigned int shutting_down:1;
|
||||||
};
|
};
|
||||||
#define TPS_MAX_BUCKETS 7
|
#define TPS_MAX_BUCKETS 7
|
||||||
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
|
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
|
||||||
@@ -123,6 +125,7 @@ struct default_taskprocessor_listener_pvt {
|
|||||||
int dead;
|
int dead;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
|
static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
|
||||||
{
|
{
|
||||||
SCOPED_MUTEX(lock, &pvt->lock);
|
SCOPED_MUTEX(lock, &pvt->lock);
|
||||||
@@ -131,20 +134,6 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
|
|||||||
ast_cond_signal(&pvt->cond);
|
ast_cond_signal(&pvt->cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void listener_destroy(void *obj)
|
|
||||||
{
|
|
||||||
struct ast_taskprocessor_listener *listener = obj;
|
|
||||||
|
|
||||||
listener->callbacks->destroy(listener->private_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
|
|
||||||
{
|
|
||||||
listener->callbacks->shutdown(listener);
|
|
||||||
ao2_ref(listener->tps, -1);
|
|
||||||
listener->tps = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
|
static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
|
||||||
{
|
{
|
||||||
SCOPED_MUTEX(lock, &pvt->lock);
|
SCOPED_MUTEX(lock, &pvt->lock);
|
||||||
@@ -188,6 +177,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
|
|||||||
return pvt;
|
return pvt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
||||||
|
{
|
||||||
|
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
||||||
|
|
||||||
|
if (was_empty) {
|
||||||
|
default_tps_wake_up(pvt, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void default_emptied(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
/* No-op */
|
||||||
|
}
|
||||||
|
|
||||||
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
|
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
|
||||||
{
|
{
|
||||||
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
||||||
@@ -204,20 +207,6 @@ static void default_listener_destroy(void *obj)
|
|||||||
ast_free(pvt);
|
ast_free(pvt);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
|
|
||||||
{
|
|
||||||
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
|
|
||||||
|
|
||||||
if (was_empty) {
|
|
||||||
default_tps_wake_up(pvt, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void default_emptied(struct ast_taskprocessor_listener *listener)
|
|
||||||
{
|
|
||||||
/* No-op */
|
|
||||||
}
|
|
||||||
|
|
||||||
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
|
||||||
.alloc = default_listener_alloc,
|
.alloc = default_listener_alloc,
|
||||||
.task_pushed = default_task_pushed,
|
.task_pushed = default_task_pushed,
|
||||||
@@ -438,16 +427,15 @@ static void tps_taskprocessor_destroy(void *tps)
|
|||||||
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
|
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
|
||||||
{
|
{
|
||||||
struct tps_task *task;
|
struct tps_task *task;
|
||||||
|
SCOPED_AO2LOCK(lock, tps);
|
||||||
|
|
||||||
if (!tps) {
|
if (tps->shutting_down) {
|
||||||
ast_log(LOG_ERROR, "missing taskprocessor\n");
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ao2_lock(tps);
|
|
||||||
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
|
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
|
||||||
tps->tps_queue_size--;
|
tps->tps_queue_size--;
|
||||||
}
|
}
|
||||||
ao2_unlock(tps);
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -466,6 +454,20 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
|
|||||||
return tps->name;
|
return tps->name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void listener_destroy(void *obj)
|
||||||
|
{
|
||||||
|
struct ast_taskprocessor_listener *listener = obj;
|
||||||
|
|
||||||
|
listener->callbacks->destroy(listener->private_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
|
||||||
|
{
|
||||||
|
listener->callbacks->shutdown(listener);
|
||||||
|
ao2_ref(listener->tps, -1);
|
||||||
|
listener->tps = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
|
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
|
||||||
{
|
{
|
||||||
RAII_VAR(struct ast_taskprocessor_listener *, listener,
|
RAII_VAR(struct ast_taskprocessor_listener *, listener,
|
||||||
|
Reference in New Issue
Block a user