[svn-commits] mmichelson: trunk r392318 - /trunk/main/threadpool.c

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Jun 20 11:29:42 CDT 2013


Author: mmichelson
Date: Thu Jun 20 11:29:35 2013
New Revision: 392318

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=392318
Log:
Fix threadpool rapid growth problem.

When a threadpool is set to autoincrement its threadcount, an issue
may arise when multiple tasks are queued at once into the threadpool. Since
threads start active, each new task would result in autoincrementing the
thread count. So if all threads were active, and a thread's autoincrement
value were 5, then 3 new tasks would result in 15 threads being created even
though the initial autoincrement was sufficient to handle the number of tasks.

This change introduces three behavior changes:

1) New threads in the threadpool start idle instead of active.
2) When a threadpool autoincrements, one thread is activated after the growth.
3) When a threadpool's size is incremented manually, all added threads are activated.

For a more detailed explanation about the changes, please see the Review Board link
at the bottom of this commit.

Review: https://reviewboard.asterisk.org/r/2629


Modified:
    trunk/main/threadpool.c

Modified: trunk/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/trunk/main/threadpool.c?view=diff&rev=392318&r1=392317&r2=392318
==============================================================================
--- trunk/main/threadpool.c (original)
+++ trunk/main/threadpool.c Thu Jun 20 11:29:35 2013
@@ -510,7 +510,7 @@
 		if (!worker) {
 			return;
 		}
-		if (ao2_link(pool->active_threads, worker)) {
+		if (ao2_link(pool->idle_threads, worker)) {
 			if (worker_thread_start(worker)) {
 				ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
 				ao2_unlink(pool->active_threads, worker);
@@ -536,24 +536,21 @@
 	struct task_pushed_data *tpd = data;
 	struct ast_threadpool *pool = tpd->pool;
 	int was_empty = tpd->was_empty;
-	int state_changed;
 
 	if (pool->listener && pool->listener->callbacks->task_pushed) {
 		pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
 	}
 	if (ao2_container_count(pool->idle_threads) == 0) {
-		if (pool->options.auto_increment > 0) {
-			grow(pool, pool->options.auto_increment);
-			state_changed = 1;
+		if (!pool->options.auto_increment) {
+			return 0;
 		}
-	} else {
-		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
-				activate_thread, pool);
-		state_changed = 1;
-	}
-	if (state_changed) {
-		threadpool_send_state_changed(pool);
-	}
+		grow(pool, pool->options.auto_increment);
+	}
+
+	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+			activate_thread, pool);
+
+	threadpool_send_state_changed(pool);
 	ao2_ref(tpd, -1);
 	return 0;
 }
@@ -808,6 +805,8 @@
 
 	if (current_size < num_threads) {
 		grow(pool, num_threads - current_size);
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+				activate_thread, pool);
 	} else {
 		shrink(pool, current_size - num_threads);
 	}
@@ -986,61 +985,18 @@
 	if (worker->options.thread_start) {
 		worker->options.thread_start();
 	}
-	worker_active(worker);
-	if (worker->options.thread_end) {
-		worker->options.thread_end();
-	}
-	return NULL;
-}
-
-/*!
- * \brief Allocate and initialize a new worker thread
- *
- * This will create, initialize, and start the thread.
- *
- * \param pool The threadpool to which the worker will be added
- * \retval NULL Failed to allocate or start the worker thread
- * \retval non-NULL The newly-created worker thread
- */
-static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
-{
-	struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
-	if (!worker) {
-		return NULL;
-	}
-	worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
-	ast_mutex_init(&worker->lock);
-	ast_cond_init(&worker->cond, NULL);
-	worker->pool = pool;
-	worker->thread = AST_PTHREADT_NULL;
-	worker->state = ALIVE;
-	worker->options = pool->options;
-	return worker;
-}
-
-static int worker_thread_start(struct worker_thread *worker)
-{
-	return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
-}
-
-/*!
- * \brief Active loop for worker threads
- *
- * The worker will stay in this loop for its lifetime,
- * executing tasks as they become available. If there
- * are no tasks currently available, then the thread
- * will go idle.
- *
- * \param worker The worker thread executing tasks.
- */
-static void worker_active(struct worker_thread *worker)
-{
-	int alive = 1;
-	while (alive) {
-		if (!threadpool_execute(worker->pool)) {
-			alive = worker_idle(worker);
+
+	ast_mutex_lock(&worker->lock);
+	while (worker_idle(worker)) {
+		ast_mutex_unlock(&worker->lock);
+		worker_active(worker);
+		ast_mutex_lock(&worker->lock);
+		if (worker->state != ALIVE) {
+			break;
 		}
-	}
+		threadpool_active_thread_idle(worker->pool, worker);
+	}
+	ast_mutex_unlock(&worker->lock);
 
 	/* Reaching this portion means the thread is
 	 * on death's door. It may have been killed while
@@ -1053,6 +1009,68 @@
 	if (worker->state == ZOMBIE) {
 		threadpool_zombie_thread_dead(worker->pool, worker);
 	}
+
+	if (worker->options.thread_end) {
+		worker->options.thread_end();
+	}
+	return NULL;
+}
+
+/*!
+ * \brief Allocate and initialize a new worker thread
+ *
+ * This will create, initialize, and start the thread.
+ *
+ * \param pool The threadpool to which the worker will be added
+ * \retval NULL Failed to allocate or start the worker thread
+ * \retval non-NULL The newly-created worker thread
+ */
+static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
+{
+	struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
+	if (!worker) {
+		return NULL;
+	}
+	worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
+	ast_mutex_init(&worker->lock);
+	ast_cond_init(&worker->cond, NULL);
+	worker->pool = pool;
+	worker->thread = AST_PTHREADT_NULL;
+	worker->state = ALIVE;
+	worker->options = pool->options;
+	return worker;
+}
+
+static int worker_thread_start(struct worker_thread *worker)
+{
+	return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
+}
+
+/*!
+ * \brief Active loop for worker threads
+ *
+ * The worker will stay in this loop for its lifetime,
+ * executing tasks as they become available. If there
+ * are no tasks currently available, then the thread
+ * will go idle.
+ *
+ * \param worker The worker thread executing tasks.
+ */
+static void worker_active(struct worker_thread *worker)
+{
+	int alive;
+
+	/* The following is equivalent to 
+	 *
+	 * while (threadpool_execute(worker->pool));
+	 *
+	 * However, reviewers have suggested in the past
+	 * doing that can cause optimizers to (wrongly)
+	 * optimize the code away.
+	 */
+	do {
+		alive = threadpool_execute(worker->pool);
+	} while (alive);
 }
 
 /*!
@@ -1060,6 +1078,8 @@
  *
  * The worker waits here until it gets told by the threadpool
  * to wake up.
+ *
+ * worker is locked before entering this function.
  *
  * \param worker The idle worker
  * \retval 0 The thread is being woken up so that it can conclude.
@@ -1072,15 +1092,10 @@
 		.tv_sec = start.tv_sec + worker->options.idle_timeout,
 		.tv_nsec = start.tv_usec * 1000,
 	};
-	SCOPED_MUTEX(lock, &worker->lock);
-	if (worker->state != ALIVE) {
-		return 0;
-	}
-	threadpool_active_thread_idle(worker->pool, worker);
 	while (!worker->wake_up) {
 		if (worker->options.idle_timeout <= 0) {
-			ast_cond_wait(&worker->cond, lock);
-		} else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+			ast_cond_wait(&worker->cond, &worker->lock);
+		} else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
 			break;
 		}
 	}




More information about the svn-commits mailing list