[svn-commits] mmichelson: branch mmichelson/threadpool r377036 - in /team/mmichelson/thread...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Mon Dec 3 10:59:29 CST 2012


Author: mmichelson
Date: Mon Dec  3 10:59:26 2012
New Revision: 377036

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=377036
Log:
This now compiles.

That's a milestone, of sorts. Things really need
arranging/documenting, and there's no function to
be able to push tasks to a threadpool.


Modified:
    team/mmichelson/threadpool/include/asterisk/threadpool.h
    team/mmichelson/threadpool/main/threadpool.c

Modified: team/mmichelson/threadpool/include/asterisk/threadpool.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/threadpool.h?view=diff&rev=377036&r1=377035&r2=377036
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/threadpool.h (original)
+++ team/mmichelson/threadpool/include/asterisk/threadpool.h Mon Dec  3 10:59:26 2012
@@ -72,15 +72,15 @@
 /*!
  * \brief Create a new threadpool
  *
- * This function creates a threadpool and returns a taskprocessor. Tasks pushed
- * to this taskprocessor will be handled by the threadpool and will be reported
- * on the threadpool's listener.
+ * This function creates a threadpool. Tasks may be pushed onto this thread pool
+ * in and will be automatically acted upon by threads within the pool.
  *
  * \param listener The listener the threadpool will notify of changes
+ * \param initial_size The number of threads for the pool to start with
  * \retval NULL Failed to create the threadpool
- * \retval non-NULL The associated taskprocessor
+ * \retval non-NULL The newly-created threadpool
  */
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener);
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size);
 
 /*!
  * \brief Set the number of threads for the thread pool
@@ -93,4 +93,10 @@
  */
 void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size);
 
+/*!
+ * \brief Shut down a threadpool and destroy it
+ *
+ * \param pool The pool to shut down
+ */
+void ast_threadpool_shutdown(struct ast_threadpool *pool);
 #endif /* ASTERISK_THREADPOOL_H */

Modified: team/mmichelson/threadpool/main/threadpool.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/threadpool.c?view=diff&rev=377036&r1=377035&r2=377036
==============================================================================
--- team/mmichelson/threadpool/main/threadpool.c (original)
+++ team/mmichelson/threadpool/main/threadpool.c Mon Dec  3 10:59:26 2012
@@ -21,17 +21,21 @@
 
 #include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/utils.h"
 
 #define THREAD_BUCKETS 89
 
 static int id_counter;
 
 struct ast_threadpool {
-	struct ast_threadpool_listener *threadpool_listener;
+	struct ast_threadpool_listener *listener;
 	struct ao2_container *active_threads;
 	struct ao2_container *idle_threads;
 	struct ao2_container *zombie_threads;
-}
+	struct ast_taskprocessor *tps;
+	struct ast_taskprocessor *control_tps;
+};
 
 enum worker_state {
 	ALIVE,
@@ -49,9 +53,9 @@
 	int wake_up;
 };
 
-static int worker_thread_hash(const void *obj)
-{
-	struct worker_thread *worker= obj;
+static int worker_thread_hash(const void *obj, int flags)
+{
+	const struct worker_thread *worker = obj;
 
 	return worker->id;
 }
@@ -64,9 +68,26 @@
 	return worker1->id == worker2->id ? CMP_MATCH : 0;
 }
 
-static worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
-{
-	struct worker_thread *worker = ao2_alloc(1, sizeof(*worker));
+static void worker_thread_destroy(void *obj)
+{
+	struct worker_thread *worker = obj;
+	ast_mutex_destroy(&worker->lock);
+	ast_cond_destroy(&worker->cond);
+}
+
+static int worker_active(struct worker_thread *worker);
+
+static void *worker_start(void *arg)
+{
+	struct worker_thread *worker = arg;
+
+	worker_active(worker);
+	return NULL;
+}
+
+static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
+{
+	struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
 	if (!worker) {
 		/* XXX Dangit! */
 		return NULL;
@@ -77,7 +98,7 @@
 	worker->pool = pool;
 	worker->thread = AST_PTHREADT_NULL;
 	worker->state = ALIVE;
-	if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) {
+	if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
 		/* XXX Poop! */
 		ao2_ref(worker, -1);
 		return NULL;
@@ -106,7 +127,7 @@
 	ao2_ref(pair->worker, -1);
 }
 
-struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
+static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
 		struct worker_thread *worker)
 {
 	struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
@@ -114,8 +135,10 @@
 		/*XXX Crap */
 		return NULL;
 	}
-	pair->pool = ao2_ref(pool);
-	pair->worker = ao2_ref(worker);
+	ao2_ref(pool, +1);
+	pair->pool = pool;
+	ao2_ref(worker, +1);
+	pair->worker = worker;
 	return pair;
 }
 
@@ -140,7 +163,7 @@
 		/*XXX Crap */
 		return;
 	}
-	ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair));
+	ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
 }
 
 static int queued_zombie_thread_dead(void *data)
@@ -162,21 +185,26 @@
 		/* XXX Crap */
 		return;
 	}
-	ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair));
+	ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
 }
 
 static int worker_idle(struct worker_thread *worker)
 {
 	SCOPED_MUTEX(lock, &worker->lock);
 	if (worker->state != ALIVE) {
-		return false;
+		return 0;
 	}
 	threadpool_active_thread_idle(worker->pool, worker);
 	while (!worker->wake_up) {
 		ast_cond_wait(&worker->cond, lock);
 	}
-	worker->wake_up = false;
+	worker->wake_up = 0;
 	return worker->state == ALIVE;
+}
+
+static int threadpool_execute(struct ast_threadpool *pool)
+{
+	return ast_taskprocessor_execute(pool->tps);
 }
 
 static int worker_active(struct worker_thread *worker)
@@ -203,13 +231,47 @@
 	return 0;
 }
 
+static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+{
+	SCOPED_MUTEX(lock, &worker->lock);
+	worker->state = state;
+	worker->wake_up = 1;
+	ast_cond_signal(&worker->cond);
+}
+
+static int worker_shutdown(void *obj, void *arg, int flags)
+{
+	struct worker_thread *worker = obj;
+
+	worker_set_state(worker, DEAD);
+	if (worker->thread != AST_PTHREADT_NULL) {
+		pthread_join(worker->thread, NULL);
+		worker->thread = AST_PTHREADT_NULL;
+	}
+	return 0;
+}
+
+static void threadpool_tps_listener_destroy(void *private_data)
+{
+	struct ast_threadpool *pool = private_data;
+	/* XXX Probably should let the listener know we're being destroyed? */
+
+	/* Threads should all be shut down by now, so this should be a painless
+	 * operation
+	 */
+	ao2_ref(pool->active_threads, -1);
+	ao2_ref(pool->idle_threads, -1);
+	ao2_ref(pool->zombie_threads, -1);
+	ao2_ref(pool->listener, -1);
+	ao2_ref(pool, -1);
+}
 
 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
 {
-	RAII_VAR(ast_threadpool *, pool,
-			ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup);
-
-	pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT);
+	RAII_VAR(struct ast_threadpool *, pool,
+			ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup);
+
+	pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
 	if (!pool->control_tps) {
 		return NULL;
 	}
@@ -222,32 +284,115 @@
 		return NULL;
 	}
 	pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
-	if (!pool->zombie_thread) {
-		return NULL;
-	}
+	if (!pool->zombie_threads) {
+		return NULL;
+	}
+
+	pool->tps = listener->tps;
 
 	ao2_ref(pool, +1);
 	return pool;
 }
 
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
-{
-	/* XXX stub */
+struct task_pushed_data {
+	struct ast_threadpool *pool;
+	int was_empty;
+};
+
+static void task_pushed_data_destroy(void *obj)
+{
+	struct task_pushed_data *tpd = obj;
+	ao2_ref(tpd->pool, -1);
+}
+
+static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
+		int was_empty)
+{
+	struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
+			task_pushed_data_destroy);
+
+	if (!tpd) {
+		return NULL;
+	}
+	ao2_ref(pool, +1);
+	tpd->pool = pool;
+	tpd->was_empty = was_empty;
+	return tpd;
+}
+
+static int activate_threads(void *obj, void *arg, int flags)
+{
+	struct worker_thread *worker = obj;
+	struct ast_threadpool *pool = arg;
+
+	ao2_link(pool->active_threads, worker);
+	worker_set_state(worker, ALIVE);
+	return 0;
+}
+
+static int handle_task_pushed(void *data)
+{
+	struct task_pushed_data *tpd = data;
+	struct ast_threadpool *pool = tpd->pool;
+	int was_empty = tpd->was_empty;
+
+	pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
+	ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool);
+	ao2_ref(tpd, -1);
+	return 0;
+}
+
+static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
+		int was_empty)
+{
+	struct ast_threadpool *pool = listener->private_data;
+	struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
+
+	if (!tpd) {
+		/* XXX Drat! */
+		return;
+	}
+
+	ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
+}
+
+static int handle_emptied(void *data)
+{
+	struct ast_threadpool *pool = data;
+
+	pool->listener->callbacks->emptied(pool->listener);
+	ao2_ref(pool, -1);
+	return 0;
 }
 
 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
 {
-	/* XXX stub */
+	struct ast_threadpool *pool = listener->private_data;
+
+	ao2_ref(pool, +1);
+	ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
 }
 
 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
 {
-	/* XXX stub */
-}
-
-static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener)
-{
-	/* XXX stub */
+	/*
+	 * The threadpool triggers the taskprocessor to shut down. As a result,
+	 * we have the freedom of shutting things down in three stages:
+	 *
+	 * 1) Before the tasprocessor is shut down
+	 * 2) During taskprocessor shutdown (here)
+	 * 3) After taskprocessor shutdown
+	 *
+	 * In the spirit of the taskprocessor shutdown, this would be
+	 * where we make sure that all the worker threads are no longer
+	 * executing. We could just do this before we even shut down
+	 * the taskprocessor, but this feels more "right".
+	 */
+
+	struct ast_threadpool *pool = listener->private_data;
+	ao2_callback(pool->active_threads, 0, worker_shutdown, NULL);
+	ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
+	ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL);
 }
 
 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
@@ -257,25 +402,6 @@
 	.shutdown = threadpool_tps_shutdown,
 	.destroy = threadpool_tps_listener_destroy,
 };
-
-/*!
- * \brief Allocate the taskprocessor to be used for the threadpool
- *
- * We use a custom taskprocessor listener. We allocate our custom
- * listener and then create a taskprocessor.
- */
-static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
-{
-	RAII_VAR(struct threadpool_tps_listener *, tps_listener,
-			ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
-			ao2_cleanup);
-
-	if (!tps_listener) {
-		return NULL;
-	}
-
-	return ast_taskprocessor_create_with_listener(tps_listener);
-}
 
 static void grow(struct ast_threadpool *pool, int delta)
 {
@@ -292,9 +418,11 @@
 
 static int kill_threads(void *obj, void *arg, int flags)
 {
+	struct worker_thread *worker = obj;
 	int *num_to_kill = arg;
 
 	if ((*num_to_kill)-- > 0) {
+		worker_shutdown(worker, arg, flags);
 		return CMP_MATCH;
 	} else {
 		return CMP_STOP;
@@ -309,6 +437,7 @@
 
 	if ((*num_to_zombify)-- > 0) {
 		ao2_link(pool->zombie_threads, worker);
+		worker_set_state(worker, ZOMBIE);
 		return CMP_MATCH;
 	} else {
 		return CMP_STOP;
@@ -325,7 +454,6 @@
 	int idle_threads = ao2_container_count(pool->idle_threads);
 	int idle_threads_to_kill = MIN(delta, idle_threads);
 	int active_threads_to_zombify = delta - idle_threads_to_kill;
-	int i = 0;
 
 	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
 			kill_threads, &idle_threads_to_kill);
@@ -335,14 +463,29 @@
 }
 
 struct set_size_data {
-	struct threadpool *pool;
+	struct ast_threadpool *pool;
 	int size;
 };
 
-void set_size_data_destroy(void *obj)
+static void set_size_data_destroy(void *obj)
 {
 	struct set_size_data *ssd = obj;
 	ao2_ref(ssd->pool, -1);
+}
+
+static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
+		int size)
+{
+	struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
+	if (!ssd) {
+		/* XXX Crap */
+		return NULL;
+	}
+
+	ao2_ref(pool, +1);
+	ssd->pool = pool;
+	ssd->size = size;
+	return ssd;
 }
 
 static int queued_set_size(void *data)
@@ -355,7 +498,7 @@
 	int current_size = ao2_container_count(pool->active_threads) +
 		ao2_container_count(pool->idle_threads);
 
-	if (current_size = num_threads) {
+	if (current_size == num_threads) {
 		return 0;
 	}
 
@@ -366,41 +509,52 @@
 	}
 
 	threadpool_send_state_changed(pool);
-	ao2_ref(set_size_data, -1);
-}
-
-void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
+	ao2_ref(ssd, -1);
+	return 0;
+}
+
+void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
 {
 	struct set_size_data *ssd;
-	if (size < 0) {
-		ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size);
+
+	ssd = set_size_data_alloc(pool, size);
+	if (!ssd) {
+		/* XXX *groan* */
 		return;
 	}
 
-	ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
-	if (!ssd) {
-		/* XXX Crap */
-		return;
-	}
-
-	ssd->pool = ao2_ref(pool);
-	ssd->size = size;
-
 	ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
 }
 
 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
 {
 	struct ast_threadpool *pool;
-	RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference);
+	struct ast_taskprocessor *tps;
+	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
+			ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
+			ao2_cleanup);
+
+	if (!tps_listener) {
+		return NULL;
+	}
+
+	tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
 
 	if (!tps) {
 		return NULL;
 	}
 
-	pool = tps->listener->private_data;
-	pool->tps = tps;
+	pool = tps_listener->private_data;
 	ast_threadpool_set_size(pool, initial_size);
-
 	return pool;
 }
+
+void ast_threadpool_shutdown(struct ast_threadpool *pool)
+{
+	/* Pretty simple really. We just shut down the
+	 * taskprocessors and everything else just
+	 * takes care of itself via the taskprocessor callbacks
+	 */
+	ast_taskprocessor_unreference(pool->control_tps);
+	ast_taskprocessor_unreference(pool->tps);
+}




More information about the svn-commits mailing list