[asterisk-commits] mmichelson: branch mmichelson/threadpool r378652 - in /team/mmichelson/thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Jan 7 16:16:09 CST 2013


Author: mmichelson
Date: Mon Jan  7 16:16:06 2013
New Revision: 378652

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=378652
Log:
Address review board feedback from Matt and Richard

* Remove extraneous whitespace
* Bump up debug levels of messages and add identifying info to messages.
* Account for potential failures of ao2_link()
* Add additional test and some more test data
* Add some comments in places where they could be useful
* Make threadpool listeners and their callbacks optional


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/include/asterisk/threadpool.h
    team/mmichelson/threadpool/main/taskprocessor.c
    team/mmichelson/threadpool/main/threadpool.c
    team/mmichelson/threadpool/tests/test_threadpool.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=378652&r1=378651&r2=378652
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Mon Jan  7 16:16:06 2013
@@ -133,6 +133,8 @@
 /*!
  * \brief A listener for taskprocessors
  *
+ * \since 12.0.0
+ *
  * When a taskprocessor's state changes, the listener
  * is notified of the change. This allows for tasks
  * to be addressed in whatever way is appropriate for
@@ -148,7 +150,9 @@
 };
 
 /*!
- * Allocate a taskprocessor listener
+ * \brief Allocate a taskprocessor listener
+ *
+ * \since 12.0.0
  *
  * This will result in the listener being allocated with the specified
  * callbacks.
@@ -176,6 +180,8 @@
 /*!
  * \brief Create a taskprocessor with a custom listener
  *
+ * \since 12.0.0
+ *
  * The listener's alloc() and start() callbacks will be called during this function.
  *
  * \param name The name of the taskprocessor to create
@@ -209,6 +215,9 @@
 
 /*!
  * \brief Pop a task off the taskprocessor and execute it.
+ *
+ * \since 12.0.0
+ *
  * \param tps The taskprocessor from which to execute.
  * \retval 0 There is no further work to be done.
  * \retval 1 Tasks still remain in the taskprocessor queue.

Modified: team/mmichelson/threadpool/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/threadpool.h?view=diff&rev=378652&r1=378651&r2=378652
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/threadpool.h (original)
+++ team/mmichelson/threadpool/include/asterisk/threadpool.h Mon Jan  7 16:16:06 2013
@@ -55,8 +55,8 @@
 			struct ast_threadpool_listener *listener,
 			int was_empty);
 	/*!
-	 * \brief Indicates the threadpoo's taskprocessor has become empty
-	 * 
+	 * \brief Indicates the threadpool's taskprocessor has become empty
+	 *
 	 * \param listener The threadpool's listener
 	 */
 	void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener);
@@ -139,7 +139,7 @@
  *
  * This number may be more or less than the current number of
  * threads in the threadpool.
- * 
+ *
  * \param threadpool The threadpool to adjust
  * \param size The new desired size of the threadpool
  */

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=378652&r1=378651&r2=378652
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Mon Jan  7 16:16:06 2013
@@ -37,7 +37,6 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/cli.h"
 #include "asterisk/taskprocessor.h"
-
 
 /*!
  * \brief tps_task structure is queued to a taskprocessor
@@ -560,6 +559,9 @@
 	ao2_ref(p, +1);
 	listener->tps = p;
 
+	/* Allocation of private data must come after setting taskprocessor parameters
+	 * so that listeners who rely on taskprocessor data will have access to it.
+	 */
 	listener->private_data = listener->callbacks->alloc(listener);
 	if (!listener->private_data) {
 		return NULL;

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=378652&r1=378651&r2=378652
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Mon Jan  7 16:16:06 2013
@@ -35,24 +35,24 @@
 struct ast_threadpool {
 	/*! Threadpool listener */
 	struct ast_threadpool_listener *listener;
-	/*! 
+	/*!
 	 * \brief The container of active threads.
 	 * Active threads are those that are currently running tasks
 	 */
 	struct ao2_container *active_threads;
-	/*! 
+	/*!
 	 * \brief The container of idle threads.
 	 * Idle threads are those that are currenly waiting to run tasks
 	 */
 	struct ao2_container *idle_threads;
-	/*! 
+	/*!
 	 * \brief The container of zombie threads.
 	 * Zombie threads may be running tasks, but they are scheduled to die soon
 	 */
 	struct ao2_container *zombie_threads;
-	/*! 
+	/*!
 	 * \brief The main taskprocessor
-	 * 
+	 *
 	 * Tasks that are queued in this taskprocessor are
 	 * doled out to the worker threads. Worker threads that
 	 * execute tasks from the threadpool are executing tasks
@@ -122,740 +122,6 @@
 	DEAD,
 };
 
-/* Worker thread forward declarations. See definitions for documentation */
-struct worker_thread;
-static int worker_thread_hash(const void *obj, int flags);
-static int worker_thread_cmp(void *obj, void *arg, int flags);
-static void worker_thread_destroy(void *obj);
-static void worker_active(struct worker_thread *worker);
-static void *worker_start(void *arg);
-static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
-static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
-static void worker_shutdown(struct worker_thread *worker);
-
-/*!
- * \brief Notify the threadpool listener that the state has changed.
- *
- * This notifies the threadpool listener via its state_changed callback.
- * \param pool The threadpool whose state has changed
- */
-static void threadpool_send_state_changed(struct ast_threadpool *pool)
-{
-	int active_size = ao2_container_count(pool->active_threads);
-	int idle_size = ao2_container_count(pool->idle_threads);
-
-	pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
-}
-
-/*!
- * \brief Struct used for queued operations involving worker state changes
- */
-struct thread_worker_pair {
-	/*! Threadpool that contains the worker whose state has changed */
-	struct ast_threadpool *pool;
-	/*! Worker whose state has changed */
-	struct worker_thread *worker;
-};
-
-/*!
- * \brief Destructor for thread_worker_pair
- */
-static void thread_worker_pair_destructor(void *obj)
-{
-	struct thread_worker_pair *pair = obj;
-	ao2_ref(pair->worker, -1);
-}
-
-/*!
- * \brief Allocate and initialize a thread_worker_pair
- * \param pool Threadpool to assign to the thread_worker_pair
- * \param worker Worker thread to assign to the thread_worker_pair
- */
-static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
-		struct worker_thread *worker)
-{
-	struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
-	if (!pair) {
-		return NULL;
-	}
-	pair->pool = pool;
-	ao2_ref(worker, +1);
-	pair->worker = worker;
-	return pair;
-}
-
-/*!
- * \brief Move a worker thread from the active container to the idle container.
- *
- * This function is called from the threadpool's control taskprocessor thread.
- * \param data A thread_worker_pair containing the threadpool and the worker to move.
- * \return 0
- */
-static int queued_active_thread_idle(void *data)
-{
-	struct thread_worker_pair *pair = data;
-
-	ao2_link(pair->pool->idle_threads, pair->worker);
-	ao2_unlink(pair->pool->active_threads, pair->worker);
-
-	threadpool_send_state_changed(pair->pool);
-
-	ao2_ref(pair, -1);
-	return 0;
-}
-
-/*!
- * \brief Queue a task to move a thread from the active list to the idle list
- *
- * This is called by a worker thread when it runs out of tasks to perform and
- * goes idle.
- * \param pool The threadpool to which the worker belongs
- * \param worker The worker thread that has gone idle
- */
-static void threadpool_active_thread_idle(struct ast_threadpool *pool,
-		struct worker_thread *worker)
-{
-	struct thread_worker_pair *pair;
-	SCOPED_AO2LOCK(lock, pool);
-	if (pool->shutting_down) {
-		return;
-	}
-	pair = thread_worker_pair_alloc(pool, worker);
-	if (!pair) {
-		return;
-	}
-	ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
-}
-
-/*!
- * \brief Kill a zombie thread
- *
- * This runs from the threadpool's control taskprocessor thread.
- *
- * \param data A thread_worker_pair containing the threadpool and the zombie thread
- * \return 0
- */
-static int queued_zombie_thread_dead(void *data)
-{
-	struct thread_worker_pair *pair = data;
-
-	ao2_unlink(pair->pool->zombie_threads, pair->worker);
-	threadpool_send_state_changed(pair->pool);
-
-	ao2_ref(pair, -1);
-	return 0;
-}
-
-/*!
- * \brief Queue a task to kill a zombie thread
- *
- * This is called by a worker thread when it acknowledges that it is time for
- * it to die.
- */
-static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
-		struct worker_thread *worker)
-{
-	struct thread_worker_pair *pair;
-	SCOPED_AO2LOCK(lock, pool);
-	if (pool->shutting_down) {
-		return;
-	}
-	pair = thread_worker_pair_alloc(pool, worker);
-	if (!pair) {
-		return;
-	}
-	ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
-}
-
-static int queued_idle_thread_dead(void *data)
-{
-	struct thread_worker_pair *pair = data;
-
-	ao2_unlink(pair->pool->idle_threads, pair->worker);
-	threadpool_send_state_changed(pair->pool);
-
-	ao2_ref(pair, -1);
-	return 0;
-}
-
-static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
-		struct worker_thread *worker)
-{
-	struct thread_worker_pair *pair;
-	SCOPED_AO2LOCK(lock, pool);
-	if (pool->shutting_down) {
-		return;
-	}
-	pair = thread_worker_pair_alloc(pool, worker);
-	if (!pair) {
-		return;
-	}
-	ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
-}
-
-/*!
- * \brief Execute a task in the threadpool
- * 
- * This is the function that worker threads call in order to execute tasks
- * in the threadpool
- *
- * \param pool The pool to which the tasks belong.
- * \retval 0 Either the pool has been shut down or there are no tasks.
- * \retval 1 There are still tasks remaining in the pool.
- */
-static int threadpool_execute(struct ast_threadpool *pool)
-{
-	ao2_lock(pool);
-	if (!pool->shutting_down) {
-		ao2_unlock(pool);
-		return ast_taskprocessor_execute(pool->tps);
-	}
-	ao2_unlock(pool);
-	return 0;
-}
-
-/*!
- * \brief Destroy a threadpool's components.
- *
- * This is the destructor called automatically when the threadpool's
- * reference count reaches zero. This is not to be confused with
- * threadpool_destroy.
- *
- * By the time this actually gets called, most of the cleanup has already
- * been done in the pool. The only thing left to do is to release the
- * final reference to the threadpool listener.
- *
- * \param obj The pool to destroy
- */
-static void threadpool_destructor(void *obj)
-{
-	struct ast_threadpool *pool = obj;
-	ao2_cleanup(pool->listener);
-}
-
-/*
- * \brief Allocate a threadpool
- *
- * This is implemented as a taskprocessor listener's alloc callback. This
- * is because the threadpool exists as the private data on a taskprocessor
- * listener.
- *
- * \param listener The taskprocessor listener where the threadpool will live.
- * \retval NULL Could not initialize threadpool properly
- * \retval non-NULL The newly-allocated threadpool
- */
-static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
-{
-	RAII_VAR(struct ast_threadpool *, pool,
-			ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
-	struct ast_str *name = ast_str_create(64);
-
-	if (!name) {
-		return NULL;
-	}
-
-	ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
-
-	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
-	if (!pool->control_tps) {
-		return NULL;
-	}
-	pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
-	if (!pool->active_threads) {
-		return NULL;
-	}
-	pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
-	if (!pool->idle_threads) {
-		return NULL;
-	}
-	pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
-	if (!pool->zombie_threads) {
-		return NULL;
-	}
-
-	ao2_ref(pool, +1);
-	return pool;
-}
-
-static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
-{
-	return 0;
-}
-
-/*!
- * \brief helper used for queued task when tasks are pushed
- */
-struct task_pushed_data {
-	/*! Pool into which a task was pushed */
-	struct ast_threadpool *pool;
-	/*! Indicator of whether the pool had no tasks prior to the new task being added */
-	int was_empty;
-};
-
-/*!
- * \brief Allocate and initialize a task_pushed_data
- * \param pool The threadpool to set in the task_pushed_data
- * \param was_empty The was_empty value to set in the task_pushed_data
- * \retval NULL Unable to allocate task_pushed_data
- * \retval non-NULL The newly-allocated task_pushed_data
- */
-static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
-		int was_empty)
-{
-	struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
-
-	if (!tpd) {
-		return NULL;
-	}
-	tpd->pool = pool;
-	tpd->was_empty = was_empty;
-	return tpd;
-}
-
-/*!
- * \brief Activate idle threads
- *
- * This function always returns CMP_MATCH because all workers that this
- * function acts on need to be seen as matches so they are unlinked from the
- * list of idle threads.
- *
- * Called as an ao2_callback in the threadpool's control taskprocessor thread.
- * \param obj The worker to activate
- * \param arg The pool where the worker belongs
- * \retval CMP_MATCH
- */
-static int activate_thread(void *obj, void *arg, int flags)
-{
-	struct worker_thread *worker = obj;
-	struct ast_threadpool *pool = arg;
-
-	ao2_link(pool->active_threads, worker);
-	worker_set_state(worker, ALIVE);
-	return CMP_MATCH;
-}
-
-/*!
- * \brief Add threads to the threadpool
- *
- * This function is called from the threadpool's control taskprocessor thread.
- * \param pool The pool that is expanding
- * \delta The number of threads to add to the pool
- */
-static void grow(struct ast_threadpool *pool, int delta)
-{
-	int i;
-
-	ast_debug(1, "Going to increase threadpool size by %d\n", delta);
-
-	for (i = 0; i < delta; ++i) {
-		struct worker_thread *worker = worker_thread_alloc(pool);
-		if (!worker) {
-			return;
-		}
-		ao2_link(pool->active_threads, worker);
-		ao2_ref(worker, -1);
-	}
-}
-
-/*!
- * \brief Queued task called when tasks are pushed into the threadpool
- *
- * This function first calls into the threadpool's listener to let it know
- * that a task has been pushed. It then wakes up all idle threads and moves
- * them into the active thread container.
- * \param data A task_pushed_data
- * \return 0
- */
-static int queued_task_pushed(void *data)
-{
-	struct task_pushed_data *tpd = data;
-	struct ast_threadpool *pool = tpd->pool;
-	int was_empty = tpd->was_empty;
-	int state_changed;
-
-	pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
-	if (ao2_container_count(pool->idle_threads) == 0) {
-		if (pool->options.auto_increment > 0) {
-			grow(pool, pool->options.auto_increment);
-			state_changed = 1;
-		}
-	} else {
-		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
-				activate_thread, pool);
-		state_changed = 1;
-	}
-	if (state_changed) {
-		threadpool_send_state_changed(pool);
-	}
-	ao2_ref(tpd, -1);
-	return 0;
-}
-
-/*!
- * \brief Taskprocessor listener callback called when a task is added
- *
- * The threadpool uses this opportunity to queue a task on its control taskprocessor
- * in order to activate idle threads and notify the threadpool listener that the
- * task has been pushed.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data
- * \param was_empty True if the taskprocessor was empty prior to the task being pushed
- */
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
-		int was_empty)
-{
-	struct ast_threadpool *pool = listener->private_data;
-	struct task_pushed_data *tpd;
-	SCOPED_AO2LOCK(lock, pool);
-
-	if (pool->shutting_down) {
-		return;
-	}
-	tpd = task_pushed_data_alloc(pool, was_empty);
-	if (!tpd) {
-		return;
-	}
-
-	ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
-}
-
-/*!
- * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
- *
- * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
- * \param data The pool that has become empty
- * \return 0
- */
-static int queued_emptied(void *data)
-{
-	struct ast_threadpool *pool = data;
-
-	pool->listener->callbacks->emptied(pool, pool->listener);
-	return 0;
-}
-
-/*!
- * \brief Taskprocessor listener emptied callback
- *
- * The threadpool queues a task to let the threadpool listener know that
- * the threadpool no longer contains any tasks.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data.
- */
-static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
-{
-	struct ast_threadpool *pool = listener->private_data;
-	SCOPED_AO2LOCK(lock, pool);
-
-	if (pool->shutting_down) {
-		return;
-	}
-
-	ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
-}
-
-/*!
- * \brief Taskprocessor listener shutdown callback
- *
- * The threadpool will shut down and destroy all of its worker threads when
- * this is called back. By the time this gets called, the taskprocessor's
- * control taskprocessor has already been destroyed. Therefore there is no risk
- * in outright destroying the worker threads here.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data.
- */
-static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
-{
-	struct ast_threadpool *pool = listener->private_data;
-
-	ao2_cleanup(pool->active_threads);
-	ao2_cleanup(pool->idle_threads);
-	ao2_cleanup(pool->zombie_threads);
-}
-
-/*!
- * \brief Taskprocessor listener destroy callback
- *
- * Since the threadpool is an ao2 object, all that is necessary is to
- * decrease the refcount. Since the control taskprocessor should already
- * be destroyed by this point, this should be the final reference to the
- * threadpool.
- *
- * \param private_data The threadpool to destroy
- */
-static void threadpool_destroy(void *private_data)
-{
-	struct ast_threadpool *pool = private_data;
-	ao2_cleanup(pool);
-}
-
-/*!
- * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
- */
-static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
-	.alloc = threadpool_alloc,
-	.start = threadpool_tps_start,
-	.task_pushed = threadpool_tps_task_pushed,
-	.emptied = threadpool_tps_emptied,
-	.shutdown = threadpool_tps_shutdown,
-	.destroy = threadpool_destroy,
-};
-
-/*!
- * \brief ao2 callback to kill a set number of threads.
- *
- * Threads will be unlinked from the container as long as the
- * counter has not reached zero. The counter is decremented with
- * each thread that is removed.
- * \param obj The worker thread up for possible destruction
- * \param arg The counter
- * \param flags Unused
- * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
- * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
- */
-static int kill_threads(void *obj, void *arg, int flags)
-{
-	int *num_to_kill = arg;
-
-	if (*num_to_kill > 0) {
-		--(*num_to_kill);
-		return CMP_MATCH;
-	} else {
-		return CMP_STOP;
-	}
-}
-
-/*!
- * \brief ao2 callback to zombify a set number of threads.
- *
- * Threads will be zombified as long as as the counter has not reached
- * zero. The counter is decremented with each thread that is zombified.
- *
- * Zombifying a thread involves removing it from its current container,
- * adding it to the zombie container, and changing the state of the
- * worker to a zombie
- *
- * This callback is called from the threadpool control taskprocessor thread.
- *
- * \param obj The worker thread that may be zombified
- * \param arg The pool to which the worker belongs
- * \param data The counter
- * \param flags Unused
- * \retval CMP_MATCH The zombified thread should be removed from its current container
- * \retval CMP_STOP Stop attempting to zombify threads
- */
-static int zombify_threads(void *obj, void *arg, void *data, int flags)
-{
-	struct worker_thread *worker = obj;
-	struct ast_threadpool *pool = arg;
-	int *num_to_zombify = data;
-
-	if ((*num_to_zombify)-- > 0) {
-		ao2_link(pool->zombie_threads, worker);
-		worker_set_state(worker, ZOMBIE);
-		return CMP_MATCH;
-	} else {
-		return CMP_STOP;
-	}
-}
-
-/*!
- * \brief Remove threads from the threadpool
- *
- * The preference is to kill idle threads. However, if there are
- * more threads to remove than there are idle threads, then active
- * threads will be zombified instead.
- *
- * This function is called from the threadpool control taskprocessor thread.
- *
- * \param pool The threadpool to remove threads from
- * \param delta The number of threads to remove
- */
-static void shrink(struct ast_threadpool *pool, int delta)
-{
-	/* 
-	 * Preference is to kill idle threads, but
-	 * we'll move on to deactivating active threads
-	 * if we have to
-	 */
-	int idle_threads = ao2_container_count(pool->idle_threads);
-	int idle_threads_to_kill = MIN(delta, idle_threads);
-	int active_threads_to_zombify = delta - idle_threads_to_kill;
-
-	ast_debug(1, "Going to kill off %d idle threads\n", idle_threads_to_kill);
-
-	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
-			kill_threads, &idle_threads_to_kill);
-
-	ast_debug(1, "Going to kill off %d active threads\n", active_threads_to_zombify);
-
-	ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
-			zombify_threads, pool, &active_threads_to_zombify);
-}
-
-/*!
- * \brief Helper struct used for queued operations that change the size of the threadpool
- */
-struct set_size_data {
-	/*! The pool whose size is to change */
-	struct ast_threadpool *pool;
-	/*! The requested new size of the pool */
-	unsigned int size;
-};
-
-/*!
- * \brief Allocate and initialize a set_size_data
- * \param pool The pool for the set_size_data
- * \param size The size to store in the set_size_data
- */
-static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
-		unsigned int size)
-{
-	struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
-	if (!ssd) {
-		return NULL;
-	}
-
-	ssd->pool = pool;
-	ssd->size = size;
-	return ssd;
-}
-
-/*!
- * \brief Change the size of the threadpool
- *
- * This can either result in shrinking or growing the threadpool depending
- * on the new desired size and the current size.
- *
- * This function is run from the threadpool control taskprocessor thread
- *
- * \param data A set_size_data used for determining how to act
- * \return 0
- */
-static int queued_set_size(void *data)
-{
-	struct set_size_data *ssd = data;
-	struct ast_threadpool *pool = ssd->pool;
-	unsigned int num_threads = ssd->size;
-
-	/* We don't count zombie threads as being "live when potentially resizing */
-	unsigned int current_size = ao2_container_count(pool->active_threads) +
-		ao2_container_count(pool->idle_threads);
-
-	if (current_size == num_threads) {
-		ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
-				num_threads, current_size);
-		return 0;
-	}
-
-	if (current_size < num_threads) {
-		grow(pool, num_threads - current_size);
-	} else {
-		shrink(pool, current_size - num_threads);
-	}
-
-	threadpool_send_state_changed(pool);
-	ao2_ref(ssd, -1);
-	return 0;
-}
-
-void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
-{
-	struct set_size_data *ssd;
-	SCOPED_AO2LOCK(lock, pool);
-	if (pool->shutting_down) {
-		return;
-	}
-
-	ssd = set_size_data_alloc(pool, size);
-	if (!ssd) {
-		return;
-	}
-
-	ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
-}
-
-static void listener_destructor(void *obj)
-{
-	struct ast_threadpool_listener *listener = obj;
-
-	listener->callbacks->destroy(listener->private_data);
-}
-
-struct ast_threadpool_listener *ast_threadpool_listener_alloc(
-		const struct ast_threadpool_listener_callbacks *callbacks)
-{
-	struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
-	if (!listener) {
-		return NULL;
-	}
-	listener->callbacks = callbacks;
-	listener->private_data = listener->callbacks->alloc(listener);
-	if (!listener->private_data) {
-		ao2_ref(listener, -1);
-		return NULL;
-	}
-	return listener;
-}
-
-struct pool_options_pair {
-	struct ast_threadpool *pool;
-	struct ast_threadpool_options options;
-};
-
-struct ast_threadpool *ast_threadpool_create(const char *name,
-		struct ast_threadpool_listener *listener,
-		int initial_size, const struct ast_threadpool_options *options)
-{
-	struct ast_threadpool *pool;
-	struct ast_taskprocessor *tps;
-	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
-			ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
-			ao2_cleanup);
-
-	if (!tps_listener) {
-		return NULL;
-	}
-
-	if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
-		ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
-		return NULL;
-	}
-
-	tps = ast_taskprocessor_create_with_listener(name, tps_listener);
-
-	if (!tps) {
-		return NULL;
-	}
-
-	pool = tps_listener->private_data;
-	pool->tps = tps;
-	ao2_ref(listener, +1);
-	pool->listener = listener;
-	pool->options = *options;
-	ast_threadpool_set_size(pool, initial_size);
-	return pool;
-}
-
-int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
-{
-	SCOPED_AO2LOCK(lock, pool);
-	if (!pool->shutting_down) {
-		return ast_taskprocessor_push(pool->tps, task, data);
-	}
-	return 0;
-}
-
-void ast_threadpool_shutdown(struct ast_threadpool *pool)
-{
-	/* Shut down the taskprocessors and everything else just
-	 * takes care of itself via the taskprocessor callbacks
-	 */
-	ao2_lock(pool);
-	pool->shutting_down = 1;
-	ao2_unlock(pool);
-	ast_taskprocessor_unreference(pool->control_tps);
-	ast_taskprocessor_unreference(pool->tps);
-}
-
 /*!
  * A thread that executes threadpool tasks
  */
@@ -878,6 +144,769 @@
 	struct ast_threadpool_options options;
 };
 
+/* Worker thread forward declarations. See definitions for documentation */
+static int worker_thread_hash(const void *obj, int flags);
+static int worker_thread_cmp(void *obj, void *arg, int flags);
+static void worker_thread_destroy(void *obj);
+static void worker_active(struct worker_thread *worker);
+static void *worker_start(void *arg);
+static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
+static int worker_thread_start(struct worker_thread *worker);
+static int worker_idle(struct worker_thread *worker);
+static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static void worker_shutdown(struct worker_thread *worker);
+
+/*!
+ * \brief Notify the threadpool listener that the state has changed.
+ *
+ * This notifies the threadpool listener via its state_changed callback.
+ * \param pool The threadpool whose state has changed
+ */
+static void threadpool_send_state_changed(struct ast_threadpool *pool)
+{
+	int active_size = ao2_container_count(pool->active_threads);
+	int idle_size = ao2_container_count(pool->idle_threads);
+
+	if (pool->listener && pool->listener->callbacks->state_changed) {
+		pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
+	}
+}
+
+/*!
+ * \brief Struct used for queued operations involving worker state changes
+ */
+struct thread_worker_pair {
+	/*! Threadpool that contains the worker whose state has changed */
+	struct ast_threadpool *pool;
+	/*! Worker whose state has changed */
+	struct worker_thread *worker;
+};
+
+/*!
+ * \brief Destructor for thread_worker_pair
+ */
+static void thread_worker_pair_destructor(void *obj)
+{
+	struct thread_worker_pair *pair = obj;
+	ao2_ref(pair->worker, -1);
+}
+
+/*!
+ * \brief Allocate and initialize a thread_worker_pair
+ * \param pool Threadpool to assign to the thread_worker_pair
+ * \param worker Worker thread to assign to the thread_worker_pair
+ */
+static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
+		struct worker_thread *worker)
+{
+	struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
+	if (!pair) {
+		return NULL;
+	}
+	pair->pool = pool;
+	ao2_ref(worker, +1);
+	pair->worker = worker;
+	return pair;
+}
+
+/*!
+ * \brief Move a worker thread from the active container to the idle container.
+ *
+ * This function is called from the threadpool's control taskprocessor thread.
+ * \param data A thread_worker_pair containing the threadpool and the worker to move.
+ * \return 0
+ */
+static int queued_active_thread_idle(void *data)
+{
+	struct thread_worker_pair *pair = data;
+
+	ao2_link(pair->pool->idle_threads, pair->worker);
+	ao2_unlink(pair->pool->active_threads, pair->worker);
+
+	threadpool_send_state_changed(pair->pool);
+
+	ao2_ref(pair, -1);
+	return 0;
+}
+
+/*!
+ * \brief Queue a task to move a thread from the active list to the idle list
+ *
+ * This is called by a worker thread when it runs out of tasks to perform and
+ * goes idle.
+ * \param pool The threadpool to which the worker belongs
+ * \param worker The worker thread that has gone idle
+ */
+static void threadpool_active_thread_idle(struct ast_threadpool *pool,
+		struct worker_thread *worker)
+{
+	struct thread_worker_pair *pair;
+	SCOPED_AO2LOCK(lock, pool);
+	if (pool->shutting_down) {
+		return;
+	}
+	pair = thread_worker_pair_alloc(pool, worker);
+	if (!pair) {
+		return;
+	}
+	ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
+}
+
+/*!
+ * \brief Kill a zombie thread
+ *
+ * This runs from the threadpool's control taskprocessor thread.
+ *
+ * \param data A thread_worker_pair containing the threadpool and the zombie thread
+ * \return 0
+ */
+static int queued_zombie_thread_dead(void *data)
+{
+	struct thread_worker_pair *pair = data;
+
+	ao2_unlink(pair->pool->zombie_threads, pair->worker);
+	threadpool_send_state_changed(pair->pool);
+
+	ao2_ref(pair, -1);
+	return 0;
+}
+
+/*!
+ * \brief Queue a task to kill a zombie thread
+ *
+ * This is called by a worker thread when it acknowledges that it is time for
+ * it to die.
+ */
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+		struct worker_thread *worker)
+{
+	struct thread_worker_pair *pair;
+	SCOPED_AO2LOCK(lock, pool);
+	if (pool->shutting_down) {
+		return;
+	}
+	pair = thread_worker_pair_alloc(pool, worker);
+	if (!pair) {
+		return;
+	}
+	ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
+}
+
+static int queued_idle_thread_dead(void *data)
+{
+	struct thread_worker_pair *pair = data;
+
+	ao2_unlink(pair->pool->idle_threads, pair->worker);
+	threadpool_send_state_changed(pair->pool);
+
+	ao2_ref(pair, -1);
+	return 0;
+}
+
+static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
+		struct worker_thread *worker)
+{
+	struct thread_worker_pair *pair;
+	SCOPED_AO2LOCK(lock, pool);
+	if (pool->shutting_down) {
+		return;
+	}
+	pair = thread_worker_pair_alloc(pool, worker);
+	if (!pair) {
+		return;
+	}
+	ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
+}
+
+/*!
+ * \brief Execute a task in the threadpool
+ *
+ * This is the function that worker threads call in order to execute tasks
+ * in the threadpool
+ *
+ * \param pool The pool to which the tasks belong.
+ * \retval 0 Either the pool has been shut down or there are no tasks.
+ * \retval 1 There are still tasks remaining in the pool.
+ */
+static int threadpool_execute(struct ast_threadpool *pool)
+{
+	ao2_lock(pool);
+	if (!pool->shutting_down) {
+		ao2_unlock(pool);
+		return ast_taskprocessor_execute(pool->tps);
+	}
+	ao2_unlock(pool);
+	return 0;
+}
+
+/*!
+ * \brief Destroy a threadpool's components.
+ *
+ * This is the destructor called automatically when the threadpool's
+ * reference count reaches zero. This is not to be confused with
+ * threadpool_destroy.
+ *
+ * By the time this actually gets called, most of the cleanup has already
+ * been done in the pool. The only thing left to do is to release the
+ * final reference to the threadpool listener.
+ *
+ * \param obj The pool to destroy
+ */
+static void threadpool_destructor(void *obj)
+{
+	struct ast_threadpool *pool = obj;
+	ao2_cleanup(pool->listener);
+}
+
+/*
+ * \brief Allocate a threadpool
+ *
+ * This is implemented as a taskprocessor listener's alloc callback. This
+ * is because the threadpool exists as the private data on a taskprocessor
+ * listener.
+ *
+ * \param listener The taskprocessor listener where the threadpool will live.
+ * \retval NULL Could not initialize threadpool properly
+ * \retval non-NULL The newly-allocated threadpool
+ */
+static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
+{
+	RAII_VAR(struct ast_threadpool *, pool,
+			ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
+	struct ast_str *name = ast_str_create(64);
+
+	if (!name) {
+		return NULL;
+	}
+
+	ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
+
+	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
+	if (!pool->control_tps) {
+		return NULL;
+	}
+	pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+	if (!pool->active_threads) {
+		return NULL;
+	}
+	pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+	if (!pool->idle_threads) {
+		return NULL;
+	}
+	pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+	if (!pool->zombie_threads) {
+		return NULL;
+	}
+
+	ao2_ref(pool, +1);
+	return pool;
+}
+
+static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
+{
+	return 0;
+}
+
+/*!
+ * \brief helper used for queued task when tasks are pushed
+ */
+struct task_pushed_data {
+	/*! Pool into which a task was pushed */
+	struct ast_threadpool *pool;
+	/*! Indicator of whether the pool had no tasks prior to the new task being added */
+	int was_empty;
+};
+
+/*!
+ * \brief Allocate and initialize a task_pushed_data
+ * \param pool The threadpool to set in the task_pushed_data
+ * \param was_empty The was_empty value to set in the task_pushed_data
+ * \retval NULL Unable to allocate task_pushed_data
+ * \retval non-NULL The newly-allocated task_pushed_data
+ */
+static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
+		int was_empty)
+{
+	struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
+
+	if (!tpd) {
+		return NULL;
+	}
+	tpd->pool = pool;
+	tpd->was_empty = was_empty;
+	return tpd;
+}
+
+/*!
+ * \brief Activate idle threads
+ *
+ * This function always returns CMP_MATCH because all workers that this
+ * function acts on need to be seen as matches so they are unlinked from the
+ * list of idle threads.
+ *
+ * Called as an ao2_callback in the threadpool's control taskprocessor thread.
+ * \param obj The worker to activate
+ * \param arg The pool where the worker belongs
+ * \retval CMP_MATCH
+ */
+static int activate_thread(void *obj, void *arg, int flags)
+{
+	struct worker_thread *worker = obj;
+	struct ast_threadpool *pool = arg;
+
+	if (!ao2_link(pool->active_threads, worker)) {
+		/* If we can't link the idle thread into the active container, then
+		 * we'll just leave the thread idle and not wake it up.
+		 */
+		ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
+				worker->id);
+		return 0;
+	}
+	worker_set_state(worker, ALIVE);
+	return CMP_MATCH;
+}
+
+/*!
+ * \brief Add threads to the threadpool
+ *
+ * This function is called from the threadpool's control taskprocessor thread.
+ * \param pool The pool that is expanding
+ * \delta The number of threads to add to the pool
+ */
+static void grow(struct ast_threadpool *pool, int delta)
+{
+	int i;
+
+	ast_debug(3, "Increasing threadpool %s's size by %d\n",
+			ast_taskprocessor_name(pool->tps), delta);
+
+	for (i = 0; i < delta; ++i) {
+		struct worker_thread *worker = worker_thread_alloc(pool);
+		if (!worker) {
+			return;
+		}
+		if (ao2_link(pool->active_threads, worker)) {
+			if (worker_thread_start(worker)) {
+				ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
+				ao2_unlink(pool->active_threads, worker);
+			}
+		} else {
+			ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
+		}
+		ao2_ref(worker, -1);
+	}
+}
+
+/*!
+ * \brief Queued task called when tasks are pushed into the threadpool
+ *
+ * This function first calls into the threadpool's listener to let it know
+ * that a task has been pushed. It then wakes up all idle threads and moves
+ * them into the active thread container.
+ * \param data A task_pushed_data
+ * \return 0
+ */
+static int queued_task_pushed(void *data)
+{
+	struct task_pushed_data *tpd = data;
+	struct ast_threadpool *pool = tpd->pool;
+	int was_empty = tpd->was_empty;
+	int state_changed;
+
+	if (pool->listener && pool->listener->callbacks->task_pushed) {
+		pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
+	}
+	if (ao2_container_count(pool->idle_threads) == 0) {
+		if (pool->options.auto_increment > 0) {
+			grow(pool, pool->options.auto_increment);
+			state_changed = 1;
+		}
+	} else {
+		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+				activate_thread, pool);
+		state_changed = 1;
+	}
+	if (state_changed) {
+		threadpool_send_state_changed(pool);
+	}
+	ao2_ref(tpd, -1);
+	return 0;
+}
+
+/*!
+ * \brief Taskprocessor listener callback called when a task is added
+ *
+ * The threadpool uses this opportunity to queue a task on its control taskprocessor
+ * in order to activate idle threads and notify the threadpool listener that the
+ * task has been pushed.
+ * \param listener The taskprocessor listener. The threadpool is the listener's private data
+ * \param was_empty True if the taskprocessor was empty prior to the task being pushed
+ */
+static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
+		int was_empty)
+{
+	struct ast_threadpool *pool = listener->private_data;
+	struct task_pushed_data *tpd;
+	SCOPED_AO2LOCK(lock, pool);
+
+	if (pool->shutting_down) {
+		return;
+	}
+	tpd = task_pushed_data_alloc(pool, was_empty);
+	if (!tpd) {
+		return;
+	}
+
+	ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
+}
+
+/*!
+ * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
+ *
+ * This simply lets the threadpool's listener know that the threadpool is devoid of tasks

[... 575 lines stripped ...]



More information about the asterisk-commits mailing list