[svn-commits] mmichelson: branch mmichelson/threadpool r377368 - in /team/mmichelson/thread...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Dec 6 18:30:40 CST 2012


Author: mmichelson
Date: Thu Dec  6 18:30:35 2012
New Revision: 377368

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=377368
Log:
Add new threadpool test and fix some taskprocessor bugs.

The new thread creation test fails because Asterisk locks up
while trying to lock a taskprocessor.

While trying to debug that, I found a race condition during taskprocessor
creation where a default taskprocessor listener could try to operate on
a partially started taskprocessor. This was fixed by adding a new callback
to taskprocessor listeners.

Then while testing that change, I found some bugs in the taskprocessor
tests where I was not properly unlocking when done with a lock. Scoped
locks have spoiled me a bit.

I still have not figured out why the threadpool thread creation test
is locking up.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/main/taskprocessor.c
    team/mmichelson/threadpool/main/threadpool.c
    team/mmichelson/threadpool/tests/test_taskprocessor.c
    team/mmichelson/threadpool/tests/test_threadpool.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=377368&r1=377367&r2=377368
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Thu Dec  6 18:30:35 2012
@@ -85,6 +85,15 @@
 	 */
 	void *(*alloc)(struct ast_taskprocessor_listener *listener);
 	/*!
+	 * \brief The taskprocessor has started completely
+	 *
+	 * This indicates that the taskprocessor is fully set up and the listener
+	 * can now start interacting with it.
+	 *
+	 * \param listener The listener to start
+	 */
+	int (*start)(struct ast_taskprocessor_listener *listener);
+	/*!
 	 * \brief Indicates a task was pushed to the processor
 	 *
 	 * \param listener The listener

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=377368&r1=377367&r2=377368
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Thu Dec  6 18:30:35 2012
@@ -171,10 +171,18 @@
 	ast_cond_init(&pvt->cond, NULL);
 	ast_mutex_init(&pvt->lock);
 	pvt->poll_thread = AST_PTHREADT_NULL;
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
-		return NULL;
-	}
 	return pvt;
+}
+
+static int default_listener_start(struct ast_taskprocessor_listener *listener)
+{
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+		return -1;
+	}
+
+	return 0;
 }
 
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
@@ -209,6 +217,7 @@
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
 	.alloc = default_listener_alloc,
+	.start = default_listener_start,
 	.task_pushed = default_task_pushed,
 	.emptied = default_emptied,
 	.shutdown = default_listener_shutdown,
@@ -553,6 +562,12 @@
 
 	if (!(ao2_link(tps_singletons, p))) {
 		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
+		return NULL;
+	}
+
+	if (p->listener->callbacks->start(p->listener)) {
+		ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+		ast_taskprocessor_unreference(p);
 		return NULL;
 	}
 

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=377368&r1=377367&r2=377368
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Thu Dec  6 18:30:35 2012
@@ -268,6 +268,11 @@
 	return pool;
 }
 
+static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
+{
+	return 0;
+}
+
 /*!
  * \brief helper used for queued task when tasks are pushed
  */
@@ -431,6 +436,7 @@
  */
 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
 	.alloc = threadpool_alloc,
+	.start = threadpool_tps_start,
 	.task_pushed = threadpool_tps_task_pushed,
 	.emptied = threadpool_tps_emptied,
 	.shutdown = threadpool_tps_shutdown,
@@ -623,6 +629,7 @@
 
 	pool = tps_listener->private_data;
 	pool->tps = tps;
+	ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps);
 	ao2_ref(listener, +1);
 	pool->listener = listener;
 	ast_threadpool_set_size(pool, initial_size);

Modified: team/mmichelson/threadpool/tests/test_taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_taskprocessor.c?view=diff&rev=377368&r1=377367&r2=377368
==============================================================================
--- team/mmichelson/threadpool/tests/test_taskprocessor.c (original)
+++ team/mmichelson/threadpool/tests/test_taskprocessor.c Thu Dec  6 18:30:35 2012
@@ -116,6 +116,7 @@
 			break;
 		}
 	}
+	ast_mutex_unlock(&task_data.lock);
 
 	if (!task_data.task_complete) {
 		ast_test_status_update(test, "Queued task did not execute!\n");
@@ -218,6 +219,7 @@
 			break;
 		}
 	}
+	ast_mutex_unlock(&load_task_results.lock);
 
 	if (load_task_results.tasks_completed != NUM_TASKS) {
 		ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
@@ -267,6 +269,14 @@
 }
 
 /*!
+ * \brief test taskprocessor listener's start callback
+ */
+static int test_start(struct ast_taskprocessor_listener *listener)
+{
+	return 0;
+}
+
+/*!
  * \brief test taskprocessor listener's task_pushed callback
  *
  * Adjusts private data's stats as indicated by the parameters.
@@ -309,6 +319,7 @@
 
 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
 	.alloc = test_alloc,
+	.start = test_start,
 	.task_pushed = test_task_pushed,
 	.emptied = test_emptied,
 	.shutdown = test_shutdown,

Modified: team/mmichelson/threadpool/tests/test_threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_threadpool.c?view=diff&rev=377368&r1=377367&r2=377368
==============================================================================
--- team/mmichelson/threadpool/tests/test_threadpool.c (original)
+++ team/mmichelson/threadpool/tests/test_threadpool.c Thu Dec  6 18:30:35 2012
@@ -36,6 +36,7 @@
 #include "asterisk/module.h"
 #include "asterisk/lock.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/logger.h"
 
 struct test_listener_data {
 	int num_active;
@@ -66,6 +67,7 @@
 {
 	struct test_listener_data *tld = listener->private_data;
 	SCOPED_MUTEX(lock, &tld->lock);
+	ast_log(LOG_NOTICE, "State changed: num_active: %d, num_idle: %d\n", active_threads, idle_threads);
 	tld->num_active = active_threads;
 	tld->num_idle = idle_threads;
 	ast_cond_signal(&tld->cond);
@@ -95,6 +97,7 @@
 static void test_destroy(void *private_data)
 {
 	struct test_listener_data *tld = private_data;
+	ast_debug(1, "Poop\n");
 	ast_cond_destroy(&tld->cond);
 	ast_mutex_destroy(&tld->lock);
 	ast_free(tld);
@@ -134,6 +137,15 @@
 	ast_cond_signal(&std->cond);
 	return 0;
 }
+
+#define WAIT_WHILE(tld, condition) \
+{\
+	ast_mutex_lock(&tld->lock);\
+	while ((condition)) {\
+		ast_cond_wait(&tld->cond, &tld->lock);\
+	}\
+	ast_mutex_unlock(&tld->lock);\
+}\
 
 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
 {
@@ -246,15 +258,64 @@
 	return res;
 }
 
+AST_TEST_DEFINE(threadpool_thread_creation)
+{
+	struct ast_threadpool *pool = NULL;
+	struct ast_threadpool_listener *listener = NULL;
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct test_listener_data *tld;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "threadpool_thread_creation";
+		info->category = "/main/threadpool_thread_creation/";
+		info->summary = "Test threadpool thread creation";
+		info->description =
+			"Ensure that threads can be added to a threadpool";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	listener = ast_threadpool_listener_alloc(&test_callbacks);
+	if (!listener) {
+		return AST_TEST_FAIL;
+	}
+	tld = listener->private_data;
+
+	pool = ast_threadpool_create(listener, 0);
+	if (!pool) {
+		goto end;
+	}
+
+	/* Now let's create a thread. It should start active, then go
+	 * idle immediately
+	 */
+	ast_threadpool_set_size(pool, 1);
+
+	WAIT_WHILE(tld, tld->num_idle == 0);
+
+	res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
+
+end:
+	if (pool) {
+		ast_threadpool_shutdown(pool);
+	}
+	ao2_cleanup(listener);
+	return res;
+}
+
 static int unload_module(void)
 {
 	ast_test_unregister(threadpool_push);
+	ast_test_unregister(threadpool_thread_creation);
 	return 0;
 }
 
 static int load_module(void)
 {
 	ast_test_register(threadpool_push);
+	ast_test_register(threadpool_thread_creation);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 




More information about the svn-commits mailing list