mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 02:37:10 +00:00 
			
		
		
		
	Add auto-increment option and accompanying test.
This allows for the threadpool to automatically grow if tasks are pushed to it and no idle threads are currently available. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377803 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
		| @@ -86,12 +86,23 @@ struct ast_threadpool_options { | ||||
| #define AST_THREADPOOL_OPTIONS_VERSION 1 | ||||
| 	/*! Version of thradpool options in use */ | ||||
| 	int version; | ||||
| 	/* ! | ||||
| 	/*! | ||||
| 	 * \brief Time limit in seconds for idle threads | ||||
| 	 * | ||||
| 	 * A time of 0 or less will mean an infinite timeout. | ||||
| 	 */ | ||||
| 	int idle_timeout; | ||||
| 	/*! | ||||
| 	 * \brief Number of threads to increment pool by | ||||
| 	 * | ||||
| 	 * If a task is added into a pool and no idle thread is | ||||
| 	 * available to activate, then the pool can automatically | ||||
| 	 * grow by the given amount. | ||||
| 	 * | ||||
| 	 * Zero is a perfectly valid value to give here if you want | ||||
| 	 * to control threadpool growth yourself via your listener. | ||||
| 	 */ | ||||
| 	int auto_increment; | ||||
| }; | ||||
|  | ||||
| /*! | ||||
|   | ||||
| @@ -416,7 +416,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po | ||||
| /*! | ||||
|  * \brief Activate idle threads | ||||
|  * | ||||
|  * This function always returns CMP_MATCH because all threads that this | ||||
|  * This function always returns CMP_MATCH because all workers that this | ||||
|  * function acts on need to be seen as matches so they are unlinked from the | ||||
|  * list of idle threads. | ||||
|  * | ||||
| @@ -425,7 +425,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po | ||||
|  * \param arg The pool where the worker belongs | ||||
|  * \retval CMP_MATCH | ||||
|  */ | ||||
| static int activate_threads(void *obj, void *arg, int flags) | ||||
| static int activate_thread(void *obj, void *arg, int flags) | ||||
| { | ||||
| 	struct worker_thread *worker = obj; | ||||
| 	struct ast_threadpool *pool = arg; | ||||
| @@ -435,6 +435,8 @@ static int activate_threads(void *obj, void *arg, int flags) | ||||
| 	return CMP_MATCH; | ||||
| } | ||||
|  | ||||
| static void grow(struct ast_threadpool *pool, int delta); | ||||
|  | ||||
| /*! | ||||
|  * \brief Queued task called when tasks are pushed into the threadpool | ||||
|  * | ||||
| @@ -451,8 +453,13 @@ static int queued_task_pushed(void *data) | ||||
| 	int was_empty = tpd->was_empty; | ||||
|  | ||||
| 	pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); | ||||
| 	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, | ||||
| 			activate_threads, pool); | ||||
| 	if (ao2_container_count(pool->idle_threads) == 0 && pool->options.auto_increment > 0) { | ||||
| 		grow(pool, pool->options.auto_increment); | ||||
| 	} else { | ||||
| 		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, | ||||
| 				activate_thread, pool); | ||||
| 	}	 | ||||
| 	threadpool_send_state_changed(pool); | ||||
| 	ao2_ref(tpd, -1); | ||||
| 	return 0; | ||||
| } | ||||
|   | ||||
| @@ -220,7 +220,7 @@ static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, s | ||||
| 	} | ||||
|  | ||||
| 	if (!tld->empty_notice) { | ||||
| 		ast_test_status_update(test, "Test listener never told that threadpool is empty\n"); | ||||
| 		ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); | ||||
| 		res = AST_TEST_FAIL; | ||||
| 	} | ||||
|  | ||||
| @@ -283,6 +283,7 @@ AST_TEST_DEFINE(threadpool_push) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -336,6 +337,7 @@ AST_TEST_DEFINE(threadpool_thread_creation) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -385,6 +387,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -443,6 +446,7 @@ AST_TEST_DEFINE(threadpool_thread_timeout) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 2, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -505,6 +509,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -581,6 +586,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -645,7 +651,6 @@ end: | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	return res; | ||||
|  | ||||
| } | ||||
|  | ||||
| AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) | ||||
| @@ -660,6 +665,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -737,7 +743,80 @@ end: | ||||
| 	ast_free(std2); | ||||
| 	ast_free(std3); | ||||
| 	return res; | ||||
| } | ||||
|  | ||||
| AST_TEST_DEFINE(threadpool_auto_increment) | ||||
| { | ||||
| 	struct ast_threadpool *pool = NULL; | ||||
| 	struct ast_threadpool_listener *listener = NULL; | ||||
| 	struct simple_task_data *std = NULL; | ||||
| 	enum ast_test_result_state res = AST_TEST_FAIL; | ||||
| 	struct test_listener_data *tld; | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 3, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| 	case TEST_INIT: | ||||
| 		info->name = "auto_increment"; | ||||
| 		info->category = "/main/threadpool/"; | ||||
| 		info->summary = "Test that the threadpool grows as tasks are added"; | ||||
| 		info->description = | ||||
| 			"Create an empty threadpool and push a task to it. Once the task is\n" | ||||
| 			"pushed, the threadpool should add three threads and be able to\n" | ||||
| 			"handle the task. The threads should then go idle\n"; | ||||
| 		return AST_TEST_NOT_RUN; | ||||
| 	case TEST_EXECUTE: | ||||
| 		break; | ||||
| 	} | ||||
|  | ||||
| 	listener = ast_threadpool_listener_alloc(&test_callbacks); | ||||
| 	if (!listener) { | ||||
| 		return AST_TEST_FAIL; | ||||
| 	} | ||||
| 	tld = listener->private_data; | ||||
|  | ||||
| 	pool = ast_threadpool_create(info->name, listener, 0, &options); | ||||
| 	if (!pool) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	std = simple_task_data_alloc(); | ||||
| 	if (!std) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std); | ||||
|  | ||||
| 	/* Pushing the task should result in the threadpool growing | ||||
| 	 * by three threads. This will allow the task to actually execute | ||||
| 	 */ | ||||
| 	res = wait_for_completion(test, std); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_empty_notice(test, tld); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_until_thread_state(test, tld, 0, 3); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = listener_check(test, listener, 1, 1, 1, 0, 3, 1); | ||||
|  | ||||
| end: | ||||
| 	if (pool) { | ||||
| 		ast_threadpool_shutdown(pool); | ||||
| 	} | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	return res; | ||||
| } | ||||
|  | ||||
| AST_TEST_DEFINE(threadpool_reactivation) | ||||
| @@ -751,6 +830,7 @@ AST_TEST_DEFINE(threadpool_reactivation) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -913,6 +993,7 @@ AST_TEST_DEFINE(threadpool_task_distribution) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -1001,6 +1082,7 @@ AST_TEST_DEFINE(threadpool_more_destruction) | ||||
| 	struct ast_threadpool_options options = { | ||||
| 		.version = AST_THREADPOOL_OPTIONS_VERSION, | ||||
| 		.idle_timeout = 0, | ||||
| 		.auto_increment = 0, | ||||
| 	}; | ||||
|  | ||||
| 	switch (cmd) { | ||||
| @@ -1104,6 +1186,7 @@ static int unload_module(void) | ||||
| 	ast_test_unregister(threadpool_one_task_one_thread); | ||||
| 	ast_test_unregister(threadpool_one_thread_one_task); | ||||
| 	ast_test_unregister(threadpool_one_thread_multiple_tasks); | ||||
| 	ast_test_unregister(threadpool_auto_increment); | ||||
| 	ast_test_unregister(threadpool_reactivation); | ||||
| 	ast_test_unregister(threadpool_task_distribution); | ||||
| 	ast_test_unregister(threadpool_more_destruction); | ||||
| @@ -1119,6 +1202,7 @@ static int load_module(void) | ||||
| 	ast_test_register(threadpool_one_task_one_thread); | ||||
| 	ast_test_register(threadpool_one_thread_one_task); | ||||
| 	ast_test_register(threadpool_one_thread_multiple_tasks); | ||||
| 	ast_test_register(threadpool_auto_increment); | ||||
| 	ast_test_register(threadpool_reactivation); | ||||
| 	ast_test_register(threadpool_task_distribution); | ||||
| 	ast_test_register(threadpool_more_destruction); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user