[asterisk-commits] threadpool: Handle worker thread transitioning to dead when ... (asterisk[13])

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Nov 12 06:09:01 CST 2015


Joshua Colp has submitted this change and it was merged.

Change subject: threadpool: Handle worker thread transitioning to dead when going active.
......................................................................


threadpool: Handle worker thread transitioning to dead when going active.

This change adds handling of dead worker threads when moving them
to be active. When this happens the worker thread is removed from
both the active and idle threads container. If no threads are able
to be moved to active then the pool grows as configured.

A unit test has also been added which thrashes the idle timeout
and thread activation to exploit any race conditions between the
two.

ASTERISK-25546 #close

Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
---
M main/threadpool.c
M tests/test_threadpool.c
2 files changed, 142 insertions(+), 8 deletions(-)

Approvals:
  Anonymous Coward #1000019: Verified
  Matt Jordan: Looks good to me, but someone else must approve
  Joshua Colp: Looks good to me, approved



diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7ad..46de9b7 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@
 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
 static int worker_thread_start(struct worker_thread *worker);
 static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
 static void worker_shutdown(struct worker_thread *worker);
 
 /*!
@@ -482,7 +482,16 @@
 				worker->id);
 		return 0;
 	}
-	worker_set_state(worker, ALIVE);
+
+	if (worker_set_state(worker, ALIVE)) {
+		ast_debug(1, "Failed to activate thread %d. It is dead\n",
+				worker->id);
+		/* The worker thread will no longer exist in the active threads or
+		 * idle threads container after this.
+		 */
+		ao2_unlink(pool->active_threads, worker);
+	}
+
 	return CMP_MATCH;
 }
 
@@ -538,19 +547,32 @@
 	struct task_pushed_data *tpd = data;
 	struct ast_threadpool *pool = tpd->pool;
 	int was_empty = tpd->was_empty;
+	unsigned int existing_active;
 
 	if (pool->listener && pool->listener->callbacks->task_pushed) {
 		pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
 	}
-	if (ao2_container_count(pool->idle_threads) == 0) {
+
+	existing_active = ao2_container_count(pool->active_threads);
+
+	/* The first pass transitions any existing idle threads to be active, and
+	 * will also remove any worker threads that have recently entered the dead
+	 * state.
+	 */
+	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+			activate_thread, pool);
+
+	/* If no idle threads could be transitioned to active grow the pool as permitted. */
+	if (ao2_container_count(pool->active_threads) == existing_active) {
 		if (!pool->options.auto_increment) {
+			ao2_ref(tpd, -1);
 			return 0;
 		}
 		grow(pool, pool->options.auto_increment);
+		/* An optional second pass transitions any newly added threads. */
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+				activate_thread, pool);
 	}
-
-	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
-			activate_thread, pool);
 
 	threadpool_send_state_changed(pool);
 	ao2_ref(tpd, -1);
@@ -797,7 +819,7 @@
 
 	/* We don't count zombie threads as being "live" when potentially resizing */
 	unsigned int current_size = ao2_container_count(pool->active_threads) +
-		ao2_container_count(pool->idle_threads);
+			ao2_container_count(pool->idle_threads);
 
 	if (current_size == num_threads) {
 		ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@
 	}
 
 	if (current_size < num_threads) {
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+				activate_thread, pool);
+
+		/* As the above may have altered the number of current threads update it */
+		current_size = ao2_container_count(pool->active_threads) +
+				ao2_container_count(pool->idle_threads);
 		grow(pool, num_threads - current_size);
 		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
 				activate_thread, pool);
@@ -1117,13 +1145,36 @@
  *
  * The threadpool calls into this function in order to let a worker know
  * how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
  */
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
 {
 	SCOPED_MUTEX(lock, &worker->lock);
+
+	switch (state) {
+	case ALIVE:
+		/* This can occur due to a race condition between being told to go active
+		 * and an idle timeout happening.
+		 */
+		if (worker->state == DEAD) {
+			return -1;
+		}
+		ast_assert(worker->state != ZOMBIE);
+		break;
+	case DEAD:
+		break;
+	case ZOMBIE:
+		ast_assert(worker->state != DEAD);
+		break;
+	}
+
 	worker->state = state;
 	worker->wake_up = 1;
 	ast_cond_signal(&worker->cond);
+
+	return 0;
 }
 
 /*! Serializer group shutdown control object. */
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index f5b1073..42181a2 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -571,6 +571,87 @@
 	return res;
 }
 
+AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
+{
+	struct ast_threadpool *pool = NULL;
+	struct ast_threadpool_listener *listener = NULL;
+	enum ast_test_result_state res = AST_TEST_FAIL;
+	struct test_listener_data *tld = NULL;
+	struct ast_threadpool_options options = {
+		.version = AST_THREADPOOL_OPTIONS_VERSION,
+		.idle_timeout = 1,
+		.auto_increment = 1,
+		.initial_size = 0,
+		.max_size = 1,
+	};
+	int iteration;
+
+	switch (cmd) {
+	case TEST_INIT:
+		info->name = "thread_timeout_thrash";
+		info->category = "/main/threadpool/";
+		info->summary = "Thrash threadpool thread timeout";
+		info->description =
+			"Repeatedly queue a task when a threadpool thread should timeout.";
+		return AST_TEST_NOT_RUN;
+	case TEST_EXECUTE:
+		break;
+	}
+
+	tld = test_alloc();
+	if (!tld) {
+		return AST_TEST_FAIL;
+	}
+
+	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+	if (!listener) {
+		goto end;
+	}
+
+	pool = ast_threadpool_create(info->name, listener, &options);
+	if (!pool) {
+		goto end;
+	}
+
+	ast_threadpool_set_size(pool, 1);
+
+	for (iteration = 0; iteration < 30; ++iteration) {
+		struct simple_task_data *std = NULL;
+		struct timeval start = ast_tvnow();
+		struct timespec end = {
+			.tv_sec = start.tv_sec + options.idle_timeout,
+			.tv_nsec = start.tv_usec * 1000
+		};
+
+		std = simple_task_data_alloc();
+		if (!std) {
+			goto end;
+		}
+
+		/* Wait until the threadpool thread should timeout due to being idle */
+		ast_mutex_lock(&tld->lock);
+		while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+			/* This purposely left empty as we want to loop waiting for a time out */
+		}
+		ast_mutex_unlock(&tld->lock);
+
+		ast_threadpool_push(pool, simple_task, std);
+	}
+
+	res = wait_until_thread_state(test, tld, 0, 0);
+	if (res == AST_TEST_FAIL) {
+		goto end;
+	}
+
+	res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
+
+end:
+	ast_threadpool_shutdown(pool);
+	ao2_cleanup(listener);
+	ast_free(tld);
+	return res;
+}
+
 AST_TEST_DEFINE(threadpool_one_task_one_thread)
 {
 	struct ast_threadpool *pool = NULL;
@@ -1610,6 +1691,7 @@
 	ast_test_unregister(threadpool_thread_creation);
 	ast_test_unregister(threadpool_thread_destruction);
 	ast_test_unregister(threadpool_thread_timeout);
+	ast_test_unregister(threadpool_thread_timeout_thrash);
 	ast_test_unregister(threadpool_one_task_one_thread);
 	ast_test_unregister(threadpool_one_thread_one_task);
 	ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -1630,6 +1712,7 @@
 	ast_test_register(threadpool_thread_creation);
 	ast_test_register(threadpool_thread_destruction);
 	ast_test_register(threadpool_thread_timeout);
+	ast_test_register(threadpool_thread_timeout_thrash);
 	ast_test_register(threadpool_one_task_one_thread);
 	ast_test_register(threadpool_one_thread_one_task);
 	ast_test_register(threadpool_one_thread_multiple_tasks);

-- 
To view, visit https://gerrit.asterisk.org/1602
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
Gerrit-PatchSet: 4
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Anonymous Coward #1000019
Gerrit-Reviewer: Joshua Colp <jcolp at digium.com>
Gerrit-Reviewer: Mark Michelson <mmichelson at digium.com>
Gerrit-Reviewer: Matt Jordan <mjordan at digium.com>
Gerrit-Reviewer: Richard Mudgett <rmudgett at digium.com>



More information about the asterisk-commits mailing list