[svn-commits] mmichelson: branch mmichelson/threadpool r377474 - in /team/mmichelson/thread...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Sun Dec 9 12:56:30 CST 2012


Author: mmichelson
Date: Sun Dec  9 12:56:25 2012
New Revision: 377474

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=377474
Log:
Create longer thread destruction test.

This one involves shrinking the threadpool in such
a way that both idle and active threads are affected.

This test made me re-realize why the zombie state exists,
so I re-added it. We don't want to clog up the control
taskprocessor by waiting on active threads to complete
what they are doing. Instead, we mark them as zombies so
that when they are done, they can clean themselves up
properly.

Without the zombie state available, the new test actually
will deadlock.


Modified:
    team/mmichelson/threadpool/main/threadpool.c
    team/mmichelson/threadpool/tests/test_threadpool.c

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=377474&r1=377473&r2=377474
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Sun Dec  9 12:56:25 2012
@@ -45,6 +45,11 @@
 	 * Idle threads are those that are currenly waiting to run tasks
 	 */
 	struct ao2_container *idle_threads;
+	/*! 
+	 * \brief The container of zombie threads.
+	 * Zombie threads may be running tasks, but they are scheduled to die soon
+	 */
+	struct ao2_container *zombie_threads;
 	/*! 
 	 * \brief The main taskprocessor
 	 * 
@@ -79,7 +84,7 @@
 	 * This is done for three main reasons
 	 * 1) It ensures that listeners are given an accurate portrayal
 	 * of the threadpool's current state. In other words, when a listener
-	 * gets told a count of active and idle threads, it does not
+	 * gets told a count of active, idle and zombie threads, it does not
 	 * need to worry that internal state of the threadpool might be different
 	 * from what it has been told.
 	 * 2) It minimizes the locking required in both the threadpool and in
@@ -96,7 +101,20 @@
 enum worker_state {
 	/*! The worker is either active or idle */
 	ALIVE,
-	/*! The worker has been asked to shut down. */
+	/*!
+	 * The worker has been asked to shut down but
+	 * may still be in the process of executing tasks.
+	 * This transition happens when the threadpool needs
+	 * to shrink and needs to kill active threads in order
+	 * to do so.
+	 */
+	ZOMBIE,
+	/*!
+	 * The worker has been asked to shut down. Typically
+	 * only idle threads go to this state directly, but
+	 * active threads may go straight to this state when
+	 * the threadpool is shut down.
+	 */
 	DEAD,
 };
 
@@ -202,6 +220,41 @@
 }
 
 /*!
+ * \brief Kill a zombie thread
+ *
+ * This runs from the threadpool's control taskprocessor thread.
+ *
+ * \param data A thread_worker_pair containing the threadpool and the zombie thread
+ * \return 0
+ */
+static int queued_zombie_thread_dead(void *data)
+{
+	struct thread_worker_pair *pair = data;
+
+	ao2_unlink(pair->pool->zombie_threads, pair->worker);
+	threadpool_send_state_changed(pair->pool);
+
+	ao2_ref(pair, -1);
+	return 0;
+}
+
+/*!
+ * \brief Queue a task to kill a zombie thread
+ *
+ * This is called by a worker thread when it acknowledges that it is time for
+ * it to die.
+ */
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+		struct worker_thread *worker)
+{
+	struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+	if (!pair) {
+		return;
+	}
+	ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
+}
+
+/*!
  * \brief Execute a task in the threadpool
  * 
  * This is the function that worker threads call in order to execute tasks
@@ -261,6 +314,10 @@
 	}
 	pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
 	if (!pool->idle_threads) {
+		return NULL;
+	}
+	pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+	if (!pool->zombie_threads) {
 		return NULL;
 	}
 
@@ -413,6 +470,7 @@
 
 	ao2_cleanup(pool->active_threads);
 	ao2_cleanup(pool->idle_threads);
+	ao2_cleanup(pool->zombie_threads);
 }
 
 /*!
@@ -459,6 +517,7 @@
 			return;
 		}
 		ao2_link(pool->active_threads, worker);
+		ao2_ref(worker, -1);
 	}
 }
 
@@ -478,7 +537,11 @@
 {
 	int *num_to_kill = arg;
 
-	if ((*num_to_kill)-- > 0) {
+	ast_log(LOG_NOTICE, "num to kill is %d\n", *num_to_kill);
+
+	if (*num_to_kill > 0) {
+		--(*num_to_kill);
+		ast_log(LOG_NOTICE, "Should be killing a thread\n");
 		return CMP_MATCH;
 	} else {
 		return CMP_STOP;
@@ -486,11 +549,46 @@
 }
 
 /*!
+ * \brief ao2 callback to zombify a set number of threads.
+ *
+ * Threads will be zombified as long as as the counter has not reached
+ * zero. The counter is decremented with each thread that is zombified.
+ *
+ * Zombifying a thread involves removing it from its current container,
+ * adding it to the zombie container, and changing the state of the
+ * worker to a zombie
+ *
+ * This callback is called from the threadpool control taskprocessor thread.
+ *
+ * \param obj The worker thread that may be zombified
+ * \param arg The pool to which the worker belongs
+ * \param data The counter
+ * \param flags Unused
+ * \retval CMP_MATCH The zombified thread should be removed from its current container
+ * \retval CMP_STOP Stop attempting to zombify threads
+ */
+static int zombify_threads(void *obj, void *arg, void *data, int flags)
+{
+	struct worker_thread *worker = obj;
+	struct ast_threadpool *pool = arg;
+	int *num_to_zombify = data;
+
+	if ((*num_to_zombify)-- > 0) {
+		ast_log(LOG_NOTICE, "Should be zombifying a thread\n");
+		ao2_link(pool->zombie_threads, worker);
+		worker_set_state(worker, ZOMBIE);
+		return CMP_MATCH;
+	} else {
+		return CMP_STOP;
+	}
+}
+
+/*!
  * \brief Remove threads from the threadpool
  *
  * The preference is to kill idle threads. However, if there are
  * more threads to remove than there are idle threads, then active
- * threads will be removed too.
+ * threads will be zombified instead.
  *
  * This function is called from the threadpool control taskprocessor thread.
  *
@@ -499,15 +597,21 @@
  */
 static void shrink(struct ast_threadpool *pool, int delta)
 {
+	/* 
+	 * Preference is to kill idle threads, but
+	 * we'll move on to deactivating active threads
+	 * if we have to
+	 */
 	int idle_threads = ao2_container_count(pool->idle_threads);
 	int idle_threads_to_kill = MIN(delta, idle_threads);
-	int active_threads_to_kill = delta - idle_threads_to_kill;
-
-	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
+	int active_threads_to_zombify = delta - idle_threads_to_kill;
+
+	ast_log(LOG_NOTICE, "Going to kill off %d idle threads\n", idle_threads_to_kill);
+	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
 			kill_threads, &idle_threads_to_kill);
 
-	ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
-			kill_threads, &active_threads_to_kill);
+	ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+			zombify_threads, pool, &active_threads_to_zombify);
 }
 
 /*!
@@ -553,20 +657,22 @@
 {
 	struct set_size_data *ssd = data;
 	struct ast_threadpool *pool = ssd->pool;
-	unsigned int new_size = ssd->size;
+	unsigned int num_threads = ssd->size;
+
+	/* We don't count zombie threads as being "live when potentially resizing */
 	unsigned int current_size = ao2_container_count(pool->active_threads) +
 		ao2_container_count(pool->idle_threads);
 
-	if (current_size == new_size) {
+	if (current_size == num_threads) {
 		ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
-				new_size, current_size);
+				num_threads, current_size);
 		return 0;
 	}
 
-	if (current_size < new_size) {
-		grow(pool, new_size - current_size);
+	if (current_size < num_threads) {
+		grow(pool, num_threads - current_size);
 	} else {
-		shrink(pool, current_size - new_size);
+		shrink(pool, current_size - num_threads);
 	}
 
 	threadpool_send_state_changed(pool);
@@ -788,6 +894,18 @@
 			alive = worker_idle(worker);
 		}
 	}
+
+	/* Reaching this portion means the thread is
+	 * on death's door. It may have been killed while
+	 * it was idle, in which case it can just die
+	 * peacefully. If it's a zombie, though, then
+	 * it needs to let the pool know so
+	 * that the thread can be removed from the
+	 * list of zombie threads.
+	 */
+	if (worker->state == ZOMBIE) {
+		threadpool_zombie_thread_dead(worker->pool, worker);
+	}
 }
 
 /*!

Modified: team/mmichelson/threadpool/tests/test_threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/tests/test_threadpool.c?view=diff&rev=377474&r1=377473&r2=377474
==============================================================================
--- team/mmichelson/threadpool/tests/test_threadpool.c (original)
+++ team/mmichelson/threadpool/tests/test_threadpool.c Sun Dec  9 12:56:25 2012
@@ -842,6 +842,98 @@
 	return res;
 }
 
+AST_TEST_DEFINE(threadpool_more_destruction)
+{
+	struct ast_threadpool *pool = NULL;
+	struct ast_threadpool_listener *listener = NULL;
+	struct complex_task_data *ctd1 = NULL;
+	struct complex_task_data *ctd2 = NULL;
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct test_listener_data *tld;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "threadpool_more_destruction";
+		info->category = "/main/threadpool/";
+		info->summary = "Test that threads are destroyed as expected";
+		info->description =
+			"Push two tasks into a threadpool. Set the threadpool size to 4\n"
+			"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
+			"threadpool down to 1 thread. Ensure that the thread leftove is active\n"
+			"and ensure that both tasks complete.\n";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	listener = ast_threadpool_listener_alloc(&test_callbacks);
+	if (!listener) {
+		return AST_TEST_FAIL;
+	}
+	tld = listener->private_data;
+
+	pool = ast_threadpool_create(listener, 0);
+	if (!pool) {
+		goto end;
+	}
+
+	ctd1 = complex_task_data_alloc();
+	ctd2 = complex_task_data_alloc();
+	if (!ctd1 || !ctd2) {
+		goto end;
+	}
+
+	ast_threadpool_push(pool, complex_task, ctd1);
+	ast_threadpool_push(pool, complex_task, ctd2);
+
+	ast_threadpool_set_size(pool, 4);
+
+	WAIT_WHILE(tld, tld->num_idle < 2);
+
+	res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	ast_threadpool_set_size(pool, 1);
+
+	/* Shrinking the threadpool should kill off the two idle threads
+	 * and one of the active threads.
+	 */
+	WAIT_WHILE(tld, tld->num_idle > 0 || tld->num_active > 1);
+
+	res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	/* The tasks are stalled until we poke them */
+	poke_worker(ctd1);
+	poke_worker(ctd2);
+
+	res = wait_for_complex_completion(ctd1);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+	res = wait_for_complex_completion(ctd2);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	WAIT_WHILE(tld, tld->num_idle < 1);
+
+	res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
+
+end:
+	if (pool) {
+		ast_threadpool_shutdown(pool);
+	}
+	ao2_cleanup(listener);
+	ast_free(ctd1);
+	ast_free(ctd2);
+	return res;
+}
+
 static int unload_module(void)
 {
 	ast_test_unregister(threadpool_push);
@@ -852,6 +944,7 @@
 	ast_test_unregister(threadpool_one_thread_multiple_tasks);
 	ast_test_unregister(threadpool_reactivation);
 	ast_test_unregister(threadpool_task_distribution);
+	ast_test_unregister(threadpool_more_destruction);
 	return 0;
 }
 
@@ -865,6 +958,7 @@
 	ast_test_register(threadpool_one_thread_multiple_tasks);
 	ast_test_register(threadpool_reactivation);
 	ast_test_register(threadpool_task_distribution);
+	ast_test_register(threadpool_more_destruction);
 	return AST_MODULE_LOAD_SUCCESS;
 }
 




More information about the svn-commits mailing list