[asterisk-commits] mmichelson: branch mmichelson/threadpool r379375 - in /team/mmichelson/thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Jan 17 10:04:14 CST 2013


Author: mmichelson
Date: Thu Jan 17 10:04:10 2013
New Revision: 379375

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=379375
Log:
Address David's latest feedback on reviewboard:

* Add a max_size option for threadpools. Also added a test for this option.
* Fixed comments to be more accurate and have fewer typos.
* Updated copyright dates on new files.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/include/asterisk/threadpool.h
    team/mmichelson/threadpool/main/threadpool.c
    team/mmichelson/threadpool/tests/test_taskprocessor.c
    team/mmichelson/threadpool/tests/test_threadpool.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=379375&r1=379374&r2=379375
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Thu Jan 17 10:04:10 2013
@@ -163,7 +163,9 @@
  *
  * \since 12.0.0
  *
- * The listener's alloc() and start() callbacks will be called during this function.
+ * Note that when a taskprocessor is created in this way, it does not create
+ * any threads to execute the tasks. This job is left up to the listener.
+ * The listener's start() callback will be called during this function.
  *
  * \param name The name of the taskprocessor to create
  * \param listener The listener for operations on this taskprocessor

Modified: team/mmichelson/threadpool/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/threadpool.h?view=diff&rev=379375&r1=379374&r2=379375
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/threadpool.h (original)
+++ team/mmichelson/threadpool/include/asterisk/threadpool.h Thu Jan 17 10:04:10 2013
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2012, Digium, Inc.
+ * Copyright (C) 2012-2013, Digium, Inc.
  *
  * Mark Michelson <mmmichelson at digium.com>
  *
@@ -69,7 +69,7 @@
 
 struct ast_threadpool_options {
 #define AST_THREADPOOL_OPTIONS_VERSION 1
-	/*! Version of thradpool options in use */
+	/*! Version of threadpool options in use */
 	int version;
 	/*!
 	 * \brief Time limit in seconds for idle threads
@@ -98,6 +98,13 @@
 	 * without any threads allocated.
 	 */
 	int initial_size;
+	/*!
+	 * \brief Maximum number of threads a pool may have
+	 *
+	 * When the threadpool's size increases, it can never increase
+	 * beyond this number of threads.
+	 */
+	int max_size;
 };
 
 /*!
@@ -127,7 +134,10 @@
  * This function creates a threadpool. Tasks may be pushed onto this thread pool
  * in and will be automatically acted upon by threads within the pool.
  *
- * \param name The name for the threadpool
+ * Only a single threadpool with a given name may exist. This function will fail
+ * if a threadpool with the given name already exists.
+ *
+ * \param name The unique name for the threadpool
  * \param listener The listener the threadpool will notify of changes. Can be NULL.
  * \param options The behavioral options for this threadpool
  * \retval NULL Failed to create the threadpool

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=379375&r1=379374&r2=379375
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Thu Jan 17 10:04:10 2013
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2012, Digium, Inc.
+ * Copyright (C) 2012-2013, Digium, Inc.
  *
  * Mark Michelson <mmmichelson at digium.com>
  *
@@ -24,6 +24,7 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/utils.h"
 
+/* Needs to stay prime if increased */
 #define THREAD_BUCKETS 89
 
 /*!
@@ -494,6 +495,13 @@
 {
 	int i;
 
+	int current_size = ao2_container_count(pool->active_threads) +
+		ao2_container_count(pool->idle_threads);
+
+	if (pool->options.max_size && current_size + delta > pool->options.max_size) {
+		delta = pool->options.max_size - current_size;
+	}
+
 	ast_debug(3, "Increasing threadpool %s's size by %d\n",
 			ast_taskprocessor_name(pool->tps), delta);
 
@@ -788,7 +796,7 @@
 	struct ast_threadpool *pool = ssd->pool;
 	unsigned int num_threads = ssd->size;
 
-	/* We don't count zombie threads as being "live when potentially resizing */
+	/* We don't count zombie threads as being "live" when potentially resizing */
 	unsigned int current_size = ao2_container_count(pool->active_threads) +
 		ao2_container_count(pool->idle_threads);
 
@@ -895,6 +903,9 @@
 
 void ast_threadpool_shutdown(struct ast_threadpool *pool)
 {
+	if (!pool) {
+		return;
+	}
 	/* Shut down the taskprocessors and everything else just
 	 * takes care of itself via the taskprocessor callbacks
 	 */

Modified: team/mmichelson/threadpool/tests/test_taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_taskprocessor.c?view=diff&rev=379375&r1=379374&r2=379375
==============================================================================
--- team/mmichelson/threadpool/tests/test_taskprocessor.c (original)
+++ team/mmichelson/threadpool/tests/test_taskprocessor.c Thu Jan 17 10:04:10 2013
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2012, Digium, Inc.
+ * Copyright (C) 2012-2013, Digium, Inc.
  *
  * Mark Michelson <mmichelson at digium.com>
  *

Modified: team/mmichelson/threadpool/tests/test_threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_threadpool.c?view=diff&rev=379375&r1=379374&r2=379375
==============================================================================
--- team/mmichelson/threadpool/tests/test_threadpool.c (original)
+++ team/mmichelson/threadpool/tests/test_threadpool.c Thu Jan 17 10:04:10 2013
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2012, Digium, Inc.
+ * Copyright (C) 2012-2013, Digium, Inc.
  *
  * Mark Michelson <mmichelson at digium.com>
  *
@@ -284,6 +284,7 @@
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -324,9 +325,7 @@
 	res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std);
 	ast_free(tld);
@@ -338,12 +337,13 @@
 	struct ast_threadpool *pool = NULL;
 	struct ast_threadpool_listener *listener = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 3,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -377,9 +377,7 @@
 	res = wait_until_thread_state(test, tld, 0, 3);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(tld);
 	return res;
@@ -391,12 +389,13 @@
 	struct ast_threadpool *pool = NULL;
 	struct ast_threadpool_listener *listener = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -434,9 +433,7 @@
 	res = wait_until_thread_state(test, tld, 0, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(tld);
 	return res;
@@ -447,12 +444,13 @@
 	struct ast_threadpool *pool = NULL;
 	struct ast_threadpool_listener *listener = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -499,9 +497,7 @@
 	res = wait_until_thread_state(test, tld, 0, 2);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(tld);
 	return res;
@@ -512,12 +508,13 @@
 	struct ast_threadpool *pool = NULL;
 	struct ast_threadpool_listener *listener = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 2,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -567,9 +564,7 @@
 	res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(tld);
 	return res;
@@ -581,12 +576,13 @@
 	struct ast_threadpool_listener *listener = NULL;
 	struct simple_task_data *std = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -648,9 +644,7 @@
 	res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std);
 	ast_free(tld);
@@ -664,12 +658,13 @@
 	struct ast_threadpool_listener *listener = NULL;
 	struct simple_task_data *std = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -732,9 +727,7 @@
 	res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std);
 	ast_free(tld);
@@ -749,12 +742,13 @@
 	struct simple_task_data *std2 = NULL;
 	struct simple_task_data *std3 = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -828,9 +822,7 @@
 	res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std1);
 	ast_free(std2);
@@ -848,12 +840,13 @@
 	struct simple_task_data *std3 = NULL;
 	struct simple_task_data *std4 = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 3,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -945,9 +938,7 @@
 	res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std1);
 	ast_free(std2);
@@ -957,6 +948,76 @@
 	return res;
 }
 
+AST_TEST_DEFINE(threadpool_max_size)
+{
+	struct ast_threadpool *pool = NULL;
+	struct ast_threadpool_listener *listener = NULL;
+	struct simple_task_data *std = NULL;
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct test_listener_data *tld = NULL;
+	struct ast_threadpool_options options = {
+		.version = AST_THREADPOOL_OPTIONS_VERSION,
+		.idle_timeout = 0,
+		.auto_increment = 3,
+		.initial_size = 0,
+		.max_size = 2,
+	};
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "max_size";
+		info->category = "/main/threadpool/";
+		info->summary = "Test that the threadpool does not exceed its maximum size restriction";
+		info->description =
+			"Create an empty threadpool and push a task to it. Once the task is\n"
+			"pushed, the threadpool should attempt to grow by three threads, but the\n"
+			"pool's restrictions should only allow two threads to be added.\n";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	tld = test_alloc();
+	if (!tld) {
+		return AST_TEST_FAIL;
+	}
+
+	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+	if (!listener) {
+		goto end;
+	}
+
+	pool = ast_threadpool_create(info->name, listener, &options);
+	if (!pool) {
+		goto end;
+	}
+
+	std = simple_task_data_alloc();
+	if (!std) {
+		goto end;
+	}
+
+	ast_threadpool_push(pool, simple_task, std);
+
+	res = wait_for_completion(test, std);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	res = wait_until_thread_state(test, tld, 0, 2);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
+end:
+	ast_threadpool_shutdown(pool);
+	ao2_cleanup(listener);
+	ast_free(std);
+	ast_free(tld);
+	return res;
+}
+
 AST_TEST_DEFINE(threadpool_reactivation)
 {
 	struct ast_threadpool *pool = NULL;
@@ -964,12 +1025,13 @@
 	struct simple_task_data *std1 = NULL;
 	struct simple_task_data *std2 = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -1052,9 +1114,7 @@
 	res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(std1);
 	ast_free(std2);
@@ -1133,12 +1193,13 @@
 	struct complex_task_data *ctd1 = NULL;
 	struct complex_task_data *ctd2 = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -1211,9 +1272,7 @@
 	res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(ctd1);
 	ast_free(ctd2);
@@ -1228,12 +1287,13 @@
 	struct complex_task_data *ctd1 = NULL;
 	struct complex_task_data *ctd2 = NULL;
 	enum ast_test_result_state res = AST_TEST_FAIL;
-	struct test_listener_data *tld;
+	struct test_listener_data *tld = NULL;
 	struct ast_threadpool_options options = {
 		.version = AST_THREADPOOL_OPTIONS_VERSION,
 		.idle_timeout = 0,
 		.auto_increment = 0,
 		.initial_size = 0,
+		.max_size = 0,
 	};
 
 	switch (cmd) {
@@ -1244,7 +1304,7 @@
 		info->description =
 			"Push two tasks into a threadpool. Set the threadpool size to 4\n"
 			"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
-			"threadpool down to 1 thread. Ensure that the thread leftove is active\n"
+			"threadpool down to 1 thread. Ensure that the thread leftover is active\n"
 			"and ensure that both tasks complete.\n";
 		return AST_TEST_NOT_RUN;
 	case TEST_EXECUTE:
@@ -1323,9 +1383,7 @@
 	res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
 
 end:
-	if (pool) {
-		ast_threadpool_shutdown(pool);
-	}
+	ast_threadpool_shutdown(pool);
 	ao2_cleanup(listener);
 	ast_free(ctd1);
 	ast_free(ctd2);
@@ -1344,6 +1402,7 @@
 	ast_test_unregister(threadpool_one_thread_one_task);
 	ast_test_unregister(threadpool_one_thread_multiple_tasks);
 	ast_test_unregister(threadpool_auto_increment);
+	ast_test_unregister(threadpool_max_size);
 	ast_test_unregister(threadpool_reactivation);
 	ast_test_unregister(threadpool_task_distribution);
 	ast_test_unregister(threadpool_more_destruction);
@@ -1361,6 +1420,7 @@
 	ast_test_register(threadpool_one_thread_one_task);
 	ast_test_register(threadpool_one_thread_multiple_tasks);
 	ast_test_register(threadpool_auto_increment);
+	ast_test_register(threadpool_max_size);
 	ast_test_register(threadpool_reactivation);
 	ast_test_register(threadpool_task_distribution);
 	ast_test_register(threadpool_more_destruction);




More information about the asterisk-commits mailing list