mirror of
				https://github.com/asterisk/asterisk.git
				synced 2025-10-31 18:55:19 +00:00 
			
		
		
		
	Merge "taskprocessor: Warn on unused result from pushing task."
This commit is contained in:
		| @@ -1111,13 +1111,15 @@ static void destroy_conference_bridge(void *obj) | ||||
| 		if (conference->playback_queue) { | ||||
| 			struct hangup_data hangup; | ||||
| 			hangup_data_init(&hangup, conference); | ||||
| 			ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup); | ||||
|  | ||||
| 			ast_mutex_lock(&hangup.lock); | ||||
| 			while (!hangup.hungup) { | ||||
| 				ast_cond_wait(&hangup.cond, &hangup.lock); | ||||
| 			if (!ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup)) { | ||||
| 				ast_mutex_lock(&hangup.lock); | ||||
| 				while (!hangup.hungup) { | ||||
| 					ast_cond_wait(&hangup.cond, &hangup.lock); | ||||
| 				} | ||||
| 				ast_mutex_unlock(&hangup.lock); | ||||
| 			} | ||||
| 			ast_mutex_unlock(&hangup.lock); | ||||
|  | ||||
| 			hangup_data_destroy(&hangup); | ||||
| 		} else { | ||||
| 			/* Playback queue is not yet allocated. Just hang up the channel straight */ | ||||
|   | ||||
| @@ -213,7 +213,8 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); | ||||
|  * \retval -1 failure | ||||
|  * \since 1.6.1 | ||||
|  */ | ||||
| int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); | ||||
| int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) | ||||
| 	attribute_warn_unused_result; | ||||
|  | ||||
| /*! \brief Local data parameter */ | ||||
| struct ast_taskprocessor_local { | ||||
| @@ -239,7 +240,8 @@ struct ast_taskprocessor_local { | ||||
|  * \since 12.0.0 | ||||
|  */ | ||||
| int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, | ||||
| 	int (*task_exe)(struct ast_taskprocessor_local *local), void *datap); | ||||
| 	int (*task_exe)(struct ast_taskprocessor_local *local), void *datap) | ||||
| 	attribute_warn_unused_result; | ||||
|  | ||||
| /*! | ||||
|  * \brief Indicate the taskprocessor is suspended. | ||||
|   | ||||
| @@ -186,7 +186,8 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz | ||||
|  * \retval 0 success | ||||
|  * \retval -1 failure | ||||
|  */ | ||||
| int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data); | ||||
| int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) | ||||
| 	attribute_warn_unused_result; | ||||
|  | ||||
| /*! | ||||
|  * \brief Shut down a threadpool and destroy it | ||||
|   | ||||
| @@ -561,7 +561,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) | ||||
|  | ||||
| 	/* When all that's done, remove the ref the mailbox has on the sub */ | ||||
| 	if (sub->mailbox) { | ||||
| 		ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub); | ||||
| 		if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) { | ||||
| 			/* Nothing we can do here, the conditional is just to keep | ||||
| 			 * the compiler happy that we're not ignoring the result. */ | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	/* Unsubscribing unrefs the subscription */ | ||||
|   | ||||
| @@ -235,7 +235,11 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene | ||||
| 	/* Hold a reference during shutdown */ | ||||
| 	ao2_t_ref(listener->tps, +1, "tps-shutdown"); | ||||
|  | ||||
| 	ast_taskprocessor_push(listener->tps, default_listener_die, pvt); | ||||
| 	if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) { | ||||
| 		/* This will cause the thread to exit early without completing tasks already | ||||
| 		 * in the queue.  This is probably the least bad option in this situation. */ | ||||
| 		default_listener_die(pvt); | ||||
| 	} | ||||
|  | ||||
| 	ast_assert(pvt->poll_thread != AST_PTHREADT_NULL); | ||||
|  | ||||
|   | ||||
| @@ -658,7 +658,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) | ||||
| 	} | ||||
|  | ||||
| 	if (pool->listener && pool->listener->callbacks->emptied) { | ||||
| 		ast_taskprocessor_push(pool->control_tps, queued_emptied, pool); | ||||
| 		if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) { | ||||
| 			/* Nothing to do here but we need the check to keep the compiler happy. */ | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -151,7 +151,10 @@ AST_TEST_DEFINE(default_taskprocessor) | ||||
| 		return AST_TEST_FAIL; | ||||
| 	} | ||||
|  | ||||
| 	ast_taskprocessor_push(tps, task, task_data); | ||||
| 	if (ast_taskprocessor_push(tps, task, task_data)) { | ||||
| 		ast_test_status_update(test, "Failed to queue task\n"); | ||||
| 		return AST_TEST_FAIL; | ||||
| 	} | ||||
|  | ||||
| 	res = task_wait(task_data); | ||||
| 	if (res != 0) { | ||||
| @@ -240,7 +243,11 @@ AST_TEST_DEFINE(default_taskprocessor_load) | ||||
|  | ||||
| 	for (i = 0; i < NUM_TASKS; ++i) { | ||||
| 		rand_data[i] = ast_random(); | ||||
| 		ast_taskprocessor_push(tps, load_task, &rand_data[i]); | ||||
| 		if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) { | ||||
| 			ast_test_status_update(test, "Failed to queue task\n"); | ||||
| 			res = AST_TEST_FAIL; | ||||
| 			goto test_end; | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	ast_mutex_lock(&load_task_results.lock); | ||||
| @@ -438,14 +445,22 @@ AST_TEST_DEFINE(taskprocessor_listener) | ||||
| 		goto test_exit; | ||||
| 	} | ||||
|  | ||||
| 	ast_taskprocessor_push(tps, listener_test_task, NULL); | ||||
| 	if (ast_taskprocessor_push(tps, listener_test_task, NULL)) { | ||||
| 		ast_test_status_update(test, "Failed to queue task\n"); | ||||
| 		res = AST_TEST_FAIL; | ||||
| 		goto test_exit; | ||||
| 	} | ||||
|  | ||||
| 	if (check_stats(test, pvt, 1, 0, 1) < 0) { | ||||
| 		res = AST_TEST_FAIL; | ||||
| 		goto test_exit; | ||||
| 	} | ||||
|  | ||||
| 	ast_taskprocessor_push(tps, listener_test_task, NULL); | ||||
| 	if (ast_taskprocessor_push(tps, listener_test_task, NULL)) { | ||||
| 		ast_test_status_update(test, "Failed to queue task\n"); | ||||
| 		res = AST_TEST_FAIL; | ||||
| 		goto test_exit; | ||||
| 	} | ||||
|  | ||||
| 	if (check_stats(test, pvt, 2, 0, 1) < 0) { | ||||
| 		res = AST_TEST_FAIL; | ||||
| @@ -710,7 +725,10 @@ AST_TEST_DEFINE(taskprocessor_push_local) | ||||
| 	local_data = 0; | ||||
| 	ast_taskprocessor_set_local(tps, &local_data); | ||||
|  | ||||
| 	ast_taskprocessor_push_local(tps, local_task_exe, task_data); | ||||
| 	if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) { | ||||
| 		ast_test_status_update(test, "Failed to queue task\n"); | ||||
| 		return AST_TEST_FAIL; | ||||
| 	} | ||||
|  | ||||
| 	res = task_wait(task_data); | ||||
| 	if (res != 0) { | ||||
|   | ||||
| @@ -127,6 +127,18 @@ static struct simple_task_data *simple_task_data_alloc(void) | ||||
| 	return std; | ||||
| } | ||||
|  | ||||
| static void simple_task_data_free(struct simple_task_data *std) | ||||
| { | ||||
| 	if (!std) { | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	ast_mutex_destroy(&std->lock); | ||||
| 	ast_cond_destroy(&std->cond); | ||||
|  | ||||
| 	ast_free(std); | ||||
| } | ||||
|  | ||||
| static int simple_task(void *data) | ||||
| { | ||||
| 	struct simple_task_data *std = data; | ||||
| @@ -319,7 +331,9 @@ AST_TEST_DEFINE(threadpool_push) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	wait_for_task_pushed(listener); | ||||
|  | ||||
| @@ -328,7 +342,7 @@ AST_TEST_DEFINE(threadpool_push) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	simple_task_data_free(std); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -635,11 +649,13 @@ AST_TEST_DEFINE(threadpool_thread_timeout_thrash) | ||||
| 		} | ||||
| 		ast_mutex_unlock(&tld->lock); | ||||
|  | ||||
| 		ast_threadpool_push(pool, simple_task, std); | ||||
| 		if (ast_threadpool_push(pool, simple_task, std)) { | ||||
| 			res = AST_TEST_FAIL; | ||||
| 		} else { | ||||
| 			res = wait_for_completion(test, std); | ||||
| 		} | ||||
|  | ||||
| 		res = wait_for_completion(test, std); | ||||
|  | ||||
| 		ast_free(std); | ||||
| 		simple_task_data_free(std); | ||||
|  | ||||
| 		if (res == AST_TEST_FAIL) { | ||||
| 			goto end; | ||||
| @@ -707,7 +723,9 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_set_size(pool, 1); | ||||
|  | ||||
| @@ -736,7 +754,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	simple_task_data_free(std); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
|  | ||||
| @@ -796,7 +814,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std)) { | ||||
| 		res = AST_TEST_FAIL; | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_completion(test, std); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| @@ -819,7 +840,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	simple_task_data_free(std); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -882,9 +903,18 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std1); | ||||
| 	ast_threadpool_push(pool, simple_task, std2); | ||||
| 	ast_threadpool_push(pool, simple_task, std3); | ||||
| 	res = AST_TEST_FAIL; | ||||
| 	if (ast_threadpool_push(pool, simple_task, std1)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, simple_task, std2)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, simple_task, std3)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_completion(test, std1); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| @@ -914,9 +944,9 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std1); | ||||
| 	ast_free(std2); | ||||
| 	ast_free(std3); | ||||
| 	simple_task_data_free(std1); | ||||
| 	simple_task_data_free(std2); | ||||
| 	simple_task_data_free(std3); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -1011,7 +1041,9 @@ AST_TEST_DEFINE(threadpool_auto_increment) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std1); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std1)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	/* Pushing the task should result in the threadpool growing | ||||
| 	 * by three threads. This will allow the task to actually execute | ||||
| @@ -1034,9 +1066,19 @@ AST_TEST_DEFINE(threadpool_auto_increment) | ||||
| 	/* Now push three tasks into the pool and ensure the pool does not | ||||
| 	 * grow. | ||||
| 	 */ | ||||
| 	ast_threadpool_push(pool, simple_task, std2); | ||||
| 	ast_threadpool_push(pool, simple_task, std3); | ||||
| 	ast_threadpool_push(pool, simple_task, std4); | ||||
| 	res = AST_TEST_FAIL; | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, simple_task, std2)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, simple_task, std3)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, simple_task, std4)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_completion(test, std2); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| @@ -1064,10 +1106,10 @@ AST_TEST_DEFINE(threadpool_auto_increment) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std1); | ||||
| 	ast_free(std2); | ||||
| 	ast_free(std3); | ||||
| 	ast_free(std4); | ||||
| 	simple_task_data_free(std1); | ||||
| 	simple_task_data_free(std2); | ||||
| 	simple_task_data_free(std3); | ||||
| 	simple_task_data_free(std4); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -1121,7 +1163,9 @@ AST_TEST_DEFINE(threadpool_max_size) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_completion(test, std); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| @@ -1137,7 +1181,7 @@ AST_TEST_DEFINE(threadpool_max_size) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std); | ||||
| 	simple_task_data_free(std); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -1193,7 +1237,9 @@ AST_TEST_DEFINE(threadpool_reactivation) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, simple_task, std1); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std1)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_set_size(pool, 1); | ||||
|  | ||||
| @@ -1218,7 +1264,10 @@ AST_TEST_DEFINE(threadpool_reactivation) | ||||
| 	} | ||||
|  | ||||
| 	/* Now make sure the threadpool reactivates when we add a second task */ | ||||
| 	ast_threadpool_push(pool, simple_task, std2); | ||||
| 	if (ast_threadpool_push(pool, simple_task, std2)) { | ||||
| 		res = AST_TEST_FAIL; | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	res = wait_for_completion(test, std2); | ||||
| 	if (res == AST_TEST_FAIL) { | ||||
| @@ -1240,8 +1289,8 @@ AST_TEST_DEFINE(threadpool_reactivation) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(std1); | ||||
| 	ast_free(std2); | ||||
| 	simple_task_data_free(std1); | ||||
| 	simple_task_data_free(std2); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
|  | ||||
| @@ -1269,6 +1318,19 @@ static struct complex_task_data *complex_task_data_alloc(void) | ||||
| 	return ctd; | ||||
| } | ||||
|  | ||||
| static void complex_task_data_free(struct complex_task_data *ctd) | ||||
| { | ||||
| 	if (!ctd) { | ||||
| 		return; | ||||
| 	} | ||||
|  | ||||
| 	ast_mutex_destroy(&ctd->lock); | ||||
| 	ast_cond_destroy(&ctd->stall_cond); | ||||
| 	ast_cond_destroy(&ctd->notify_cond); | ||||
|  | ||||
| 	ast_free(ctd); | ||||
| } | ||||
|  | ||||
| static int complex_task(void *data) | ||||
| { | ||||
| 	struct complex_task_data *ctd = data; | ||||
| @@ -1400,8 +1462,13 @@ AST_TEST_DEFINE(threadpool_task_distribution) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, complex_task, ctd1); | ||||
| 	ast_threadpool_push(pool, complex_task, ctd2); | ||||
| 	if (ast_threadpool_push(pool, complex_task, ctd1)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, complex_task, ctd2)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_set_size(pool, 2); | ||||
|  | ||||
| @@ -1438,8 +1505,8 @@ AST_TEST_DEFINE(threadpool_task_distribution) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(ctd1); | ||||
| 	ast_free(ctd2); | ||||
| 	complex_task_data_free(ctd1); | ||||
| 	complex_task_data_free(ctd2); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -1496,8 +1563,13 @@ AST_TEST_DEFINE(threadpool_more_destruction) | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_push(pool, complex_task, ctd1); | ||||
| 	ast_threadpool_push(pool, complex_task, ctd2); | ||||
| 	if (ast_threadpool_push(pool, complex_task, ctd1)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	if (ast_threadpool_push(pool, complex_task, ctd2)) { | ||||
| 		goto end; | ||||
| 	} | ||||
|  | ||||
| 	ast_threadpool_set_size(pool, 4); | ||||
|  | ||||
| @@ -1549,8 +1621,8 @@ AST_TEST_DEFINE(threadpool_more_destruction) | ||||
| end: | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ao2_cleanup(listener); | ||||
| 	ast_free(ctd1); | ||||
| 	ast_free(ctd2); | ||||
| 	complex_task_data_free(ctd1); | ||||
| 	complex_task_data_free(ctd2); | ||||
| 	ast_free(tld); | ||||
| 	return res; | ||||
| } | ||||
| @@ -1666,9 +1738,9 @@ end: | ||||
| 	poke_worker(data3); | ||||
| 	ast_taskprocessor_unreference(uut); | ||||
| 	ast_threadpool_shutdown(pool); | ||||
| 	ast_free(data1); | ||||
| 	ast_free(data2); | ||||
| 	ast_free(data3); | ||||
| 	complex_task_data_free(data1); | ||||
| 	complex_task_data_free(data2); | ||||
| 	complex_task_data_free(data3); | ||||
| 	return res; | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user