[Asterisk-code-review] threadpool: Handle worker thread transitioning to dead when ... (asterisk[13])

Joshua Colp asteriskteam at digium.com
Wed Nov 11 11:18:15 CST 2015


Joshua Colp has uploaded a new change for review.

  https://gerrit.asterisk.org/1602

Change subject: threadpool: Handle worker thread transitioning to dead when going active.
......................................................................

threadpool: Handle worker thread transitioning to dead when going active.

This change adds handling of dead worker threads when moving them
to be active. When this happens the worker thread is removed from
both the active and idle threads container. If no threads are able
to be moved to active then the pool grows as configured.

ASTERISK-25546 #close

Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
---
M main/threadpool.c
1 file changed, 47 insertions(+), 7 deletions(-)


  git pull ssh://gerrit.asterisk.org:29418/asterisk refs/changes/02/1602/1

diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7ad..7ad9ad4 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@
 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
 static int worker_thread_start(struct worker_thread *worker);
 static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
 static void worker_shutdown(struct worker_thread *worker);
 
 /*!
@@ -482,7 +482,16 @@
 				worker->id);
 		return 0;
 	}
-	worker_set_state(worker, ALIVE);
+
+	if (worker_set_state(worker, ALIVE)) {
+		ast_debug(1, "Failed to activate thread %d. It is dead\n",
+				worker->id);
+		/* The worker thread will no longer exist in the active threads or
+		 * idle threads container after this.
+		 */
+		ao2_unlink(pool->active_threads, worker);
+	}
+
 	return CMP_MATCH;
 }
 
@@ -538,19 +547,32 @@
 	struct task_pushed_data *tpd = data;
 	struct ast_threadpool *pool = tpd->pool;
 	int was_empty = tpd->was_empty;
+	unsigned int existing_active;
 
 	if (pool->listener && pool->listener->callbacks->task_pushed) {
 		pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
 	}
-	if (ao2_container_count(pool->idle_threads) == 0) {
+
+	existing_active = ao2_container_count(pool->active_threads);
+
+	/* The first pass transitions any existing idle threads to be active, and
+	 * will also remove any worker threads that have recently entered the dead
+	 * state.
+	 */
+	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+			activate_thread, pool);
+
+	/* If no idle threads could be transitioned to active grow the pool as permitted. */
+	if (ao2_container_count(pool->active_threads) == existing_active) {
 		if (!pool->options.auto_increment) {
+			ao2_ref(tpd, -1);
 			return 0;
 		}
 		grow(pool, pool->options.auto_increment);
+		/* An optional second pass transitions any newly added threads. */
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+				activate_thread, pool);
 	}
-
-	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
-			activate_thread, pool);
 
 	threadpool_send_state_changed(pool);
 	ao2_ref(tpd, -1);
@@ -806,6 +828,13 @@
 	}
 
 	if (current_size < num_threads) {
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+				activate_thread, pool);
+
+		/* As the above may have altered the number of current threads update it */
+		current_size = ao2_container_count(pool->active_threads) +
+			ao2_container_count(pool->idle_threads);
+
 		grow(pool, num_threads - current_size);
 		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
 				activate_thread, pool);
@@ -1117,13 +1146,24 @@
  *
  * The threadpool calls into this function in order to let a worker know
  * how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
  */
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
 {
 	SCOPED_MUTEX(lock, &worker->lock);
+
+	/* If the worker is already dead then no subsequent transitions are permitted */
+	if (worker->state == DEAD) {
+		return -1;
+	}
+
 	worker->state = state;
 	worker->wake_up = 1;
 	ast_cond_signal(&worker->cond);
+
+	return 0;
 }
 
 /*! Serializer group shutdown control object. */

-- 
To view, visit https://gerrit.asterisk.org/1602
To unsubscribe, visit https://gerrit.asterisk.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
Gerrit-PatchSet: 1
Gerrit-Project: asterisk
Gerrit-Branch: 13
Gerrit-Owner: Joshua Colp <jcolp at digium.com>



More information about the asterisk-code-review mailing list