[svn-commits] mmichelson: branch mmichelson/threadpool r376118 - in /team/mmichelson/thread...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Nov 8 17:27:20 CST 2012


Author: mmichelson
Date: Thu Nov  8 17:27:16 2012
New Revision: 376118

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=376118
Log:
Move taskprocessors to use a listener model.

Taskprocessors are now divided into two units: the task queue
and their listeners.

When a task is added to the queue, the listener is notified and
can take whatever action is desired. This means that taskprocessors
are no longer confined to having their tasks executed within a 
single thread.

A default taskprocessor listener has been added that mirrors the
old taskprocessor behavior.

I've tested it by running Asterisk and placing calls. It appears
to work as expected. I'm going to do some cleaning up first and
then write some unit tests to be sure everything works as expected.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/main/taskprocessor.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=376118&r1=376117&r2=376118
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Thu Nov  8 17:27:16 2012
@@ -60,6 +60,21 @@
 	TPS_REF_IF_EXISTS = (1 << 0),
 };
 
+struct ast_taskprocessor_listener;
+
+struct ast_taskprocessor_listener_callbacks {
+	/*! Indicates a task was pushed to the processor */
+	void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
+	/*! Indicates the task processor has become empty */
+	void (*emptied)(struct ast_taskprocessor_listener *listener);
+};
+
+struct ast_taskprocessor_listener {
+	struct ast_taskprocessor_listener_callbacks *callbacks;
+	struct ast_taskprocessor *tps;
+	void *private_data;
+};
+
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
  *
@@ -73,6 +88,16 @@
  * \since 1.6.1
  */
 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create);
+
+/*!
+ * \brief Create a taskprocessor with a custom listener
+ *
+ * \param name The name of the taskprocessor to create
+ * \param listener The listener for operations on this taskprocessor
+ * \retval NULL Failure
+ * \reval non-NULL success
+ */
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
 
 /*!
  * \brief Unreference the specified taskprocessor and its reference count will decrement.
@@ -97,6 +122,14 @@
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
 /*!
+ * \brief Pop a task off the taskprocessor and execute it.
+ * \param tps The taskprocessor from which to execute.
+ * \retval 0 There is no further work to be done.
+ * \retval 1 Tasks still remain in the taskprocessor queue.
+ */
+int ast_taskprocessor_execute(struct ast_taskprocessor *tps);
+
+/*!
  * \brief Return the name of the taskprocessor singleton
  * \since 1.6.1
  */

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=376118&r1=376117&r2=376118
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Thu Nov  8 17:27:16 2012
@@ -83,6 +83,7 @@
 	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
 	/*! \brief Taskprocessor singleton list entry */
 	AST_LIST_ENTRY(ast_taskprocessor) list;
+	struct ast_taskprocessor_listener *listener;
 };
 #define TPS_MAX_BUCKETS 7
 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
@@ -120,6 +121,83 @@
 static struct ast_cli_entry taskprocessor_clis[] = {
 	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
 	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+};
+
+struct default_taskprocessor_listener_pvt {
+	pthread_t poll_thread;
+	ast_mutex_t lock;
+	ast_cond_t cond;
+	int wake_up;
+	int dead;
+};
+
+static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+{
+	SCOPED_MUTEX(lock, &pvt->lock); 
+	pvt->wake_up = 1;
+	pvt->dead = should_die;
+	ast_cond_signal(&pvt->cond);
+}
+
+static void default_listener_destroy(void *obj)
+{
+	struct ast_taskprocessor_listener *listener = obj;
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+	default_tps_wake_up(pvt, 1);
+	pthread_join(pvt->poll_thread, NULL);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	ast_mutex_destroy(&pvt->lock);
+	ast_cond_destroy(&pvt->cond);
+	ast_free(pvt);
+
+	ao2_ref(listener->tps, -1);
+	listener->tps = NULL;
+}
+
+static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+{
+	SCOPED_MUTEX(lock, &pvt->lock);
+	while (!pvt->wake_up) {
+		ast_cond_wait(&pvt->cond, lock);
+	}
+	pvt->wake_up = 0;
+	return pvt->dead;
+}
+
+/* this is the task processing worker function */
+static void *tps_processing_function(void *data)
+{
+	struct ast_taskprocessor_listener *listener = data;
+	struct ast_taskprocessor *tps = listener->tps;
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	int dead = 0;
+
+	while (!dead) {
+		if (!ast_taskprocessor_execute(tps)) {
+			dead = default_tps_idle(pvt);
+		}
+	}
+	return NULL;
+}
+
+static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+{
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+	if (was_empty) {
+		default_tps_wake_up(pvt, 0);
+	}
+}
+
+static void default_emptied(struct ast_taskprocessor_listener *listener)
+{
+	/* No-op */
+}
+
+static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+	.task_pushed = default_task_pushed,
+	.emptied = default_emptied,
 };
 
 /*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -286,75 +364,22 @@
 	return CLI_SUCCESS;
 }
 
-/* this is the task processing worker function */
-static void *tps_processing_function(void *data)
-{
-	struct ast_taskprocessor *i = data;
-	struct tps_task *t;
-	int size;
-
-	if (!i) {
-		ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
-		return NULL;
-	}
-
-	while (i->poll_thread_run) {
-		ast_mutex_lock(&i->taskprocessor_lock);
-		if (!i->poll_thread_run) {
-			ast_mutex_unlock(&i->taskprocessor_lock);
-			break;
-		}
-		if (!(size = tps_taskprocessor_depth(i))) {
-			ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
-			if (!i->poll_thread_run) {
-				ast_mutex_unlock(&i->taskprocessor_lock);
-				break;
-			}
-		}
-		ast_mutex_unlock(&i->taskprocessor_lock);
-		/* stuff is in the queue */
-		if (!(t = tps_taskprocessor_pop(i))) {
-			ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
-			continue;
-		}
-		if (!t->execute) {
-			ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
-			tps_task_free(t);
-			continue;
-		}
-		t->execute(t->datap);
-
-		ast_mutex_lock(&i->taskprocessor_lock);
-		if (i->stats) {
-			i->stats->_tasks_processed_count++;
-			if (size > i->stats->max_qsize) {
-				i->stats->max_qsize = size;
-			}
-		}
-		ast_mutex_unlock(&i->taskprocessor_lock);
-
-		tps_task_free(t);
-	}
-	while ((t = tps_taskprocessor_pop(i))) {
-		tps_task_free(t);
-	}
-	return NULL;
-}
-
 /* hash callback for astobj2 */
 static int tps_hash_cb(const void *obj, const int flags)
 {
 	const struct ast_taskprocessor *tps = obj;
-
-	return ast_str_case_hash(tps->name);
+	const char *name = flags & OBJ_KEY ? obj : tps->name;
+
+	return ast_str_case_hash(name);
 }
 
 /* compare callback for astobj2 */
 static int tps_cmp_cb(void *obj, void *arg, int flags)
 {
 	struct ast_taskprocessor *lhs = obj, *rhs = arg;
-
-	return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
+	const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
+
+	return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
 /* destroy the taskprocessor */
@@ -368,20 +393,21 @@
 	}
 	ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
 	/* kill it */
-	ast_mutex_lock(&t->taskprocessor_lock);
-	t->poll_thread_run = 0;
-	ast_cond_signal(&t->poll_cond);
-	ast_mutex_unlock(&t->taskprocessor_lock);
-	pthread_join(t->poll_thread, NULL);
-	t->poll_thread = AST_PTHREADT_NULL;
 	ast_mutex_destroy(&t->taskprocessor_lock);
-	ast_cond_destroy(&t->poll_cond);
 	/* free it */
 	if (t->stats) {
 		ast_free(t->stats);
 		t->stats = NULL;
 	}
 	ast_free((char *) t->name);
+	if (t->listener) {
+		/* This code should not be reached since the listener
+		 * should have been destroyed before the taskprocessor could
+		 * be destroyed
+		 */
+		ao2_ref(t->listener, -1);
+		t->listener = NULL;
+	}
 }
 
 /* pop the front task and return it */
@@ -414,6 +440,32 @@
 		return NULL;
 	}
 	return tps->name;
+}
+
+static struct ast_taskprocessor_listener *default_listener_alloc(void)
+{
+	struct ast_taskprocessor_listener *listener;
+	struct default_taskprocessor_listener_pvt *pvt;
+
+	listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+	if (!listener) {
+		return NULL;
+	}
+	pvt = ast_calloc(1, sizeof(*pvt));
+	if (!pvt) {
+		ao2_ref(listener, -1);
+		return NULL;
+	}
+	listener->callbacks = &default_listener_callbacks;
+	listener->private_data = pvt;
+	ast_cond_init(&pvt->cond, NULL);
+	ast_mutex_init(&pvt->lock);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+		ao2_ref(listener, -1);
+		return NULL;
+	}
+	return listener;
 }
 
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
@@ -421,75 +473,89 @@
  * if it already exists */
 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
 {
-	struct ast_taskprocessor *p, tmp_tps = {
-		.name = name,
-	};
+	struct ast_taskprocessor *p;
+	struct ast_taskprocessor_listener *listener;
 
 	if (ast_strlen_zero(name)) {
 		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
 		return NULL;
 	}
-	ao2_lock(tps_singletons);
-	p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
+	p = ao2_find(tps_singletons, name, OBJ_KEY);
 	if (p) {
 		ao2_unlock(tps_singletons);
 		return p;
 	}
 	if (create & TPS_REF_IF_EXISTS) {
 		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
-		ao2_unlock(tps_singletons);
-		return NULL;
-	}
-	/* create a new taskprocessor */
-	if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
-		ao2_unlock(tps_singletons);
+		return NULL;
+	}
+	/* Create a new taskprocessor. Start by creating a default listener */
+	listener = default_listener_alloc();
+
+	p = ast_taskprocessor_create_with_listener(name, listener);
+	ao2_ref(listener, -1);
+	return p;
+
+}
+
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
+{
+	RAII_VAR(struct ast_taskprocessor *, p,
+			ao2_alloc(sizeof(*p), tps_taskprocessor_destroy),
+			ao2_cleanup);
+
+	if (!p) {
 		ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
 		return NULL;
 	}
 
-	ast_cond_init(&p->poll_cond, NULL);
-	ast_mutex_init(&p->taskprocessor_lock);
-
 	if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
-		ao2_unlock(tps_singletons);
 		ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
+		return NULL;
+	}
+	if (!(p->name = ast_strdup(name))) {
 		ao2_ref(p, -1);
 		return NULL;
 	}
-	if (!(p->name = ast_strdup(name))) {
-		ao2_unlock(tps_singletons);
-		ao2_ref(p, -1);
-		return NULL;
-	}
-	p->poll_thread_run = 1;
-	p->poll_thread = AST_PTHREADT_NULL;
-	if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
-		ao2_unlock(tps_singletons);
-		ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
-		ao2_ref(p, -1);
-		return NULL;
-	}
+
+	ao2_ref(listener, +1);
+	p->listener = listener;
+
+	ao2_ref(p, +1);
+	listener->tps = p;
+
 	if (!(ao2_link(tps_singletons, p))) {
-		ao2_unlock(tps_singletons);
 		ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
-		ao2_ref(p, -1);
-		return NULL;
-	}
-	ao2_unlock(tps_singletons);
+		return NULL;
+	}
+
+	/* RAII_VAR will decrement the refcount at the end of the function.
+	 * Since we want to pass back a reference to p, we bump the refcount
+	 */
+	ao2_ref(p, +1);
 	return p;
 }
 
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
-	if (tps) {
-		ao2_lock(tps_singletons);
-		ao2_unlink(tps_singletons, tps);
-		if (ao2_ref(tps, -1) > 1) {
-			ao2_link(tps_singletons, tps);
-		}
-		ao2_unlock(tps_singletons);
-	}
+	struct ast_taskprocessor_listener *listener;
+	if (!tps) {
+		return NULL;
+	}
+
+	if (ao2_ref(tps, -1) > 3) {
+		return NULL;
+	}
+	/* If we're down to 3 references, then those must be:
+	 * 1. The reference we just got rid of
+	 * 2. The container
+	 * 3. The listener
+	 */
+	ao2_unlink(tps_singletons, tps);
+	listener = tps->listener;
+	tps->listener = NULL;
+	ao2_ref(listener, -1);
 	return NULL;
 }
 
@@ -497,6 +563,7 @@
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
 {
 	struct tps_task *t;
+	int previous_size;
 
 	if (!tps || !task_exe) {
 		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -508,9 +575,38 @@
 	}
 	ast_mutex_lock(&tps->taskprocessor_lock);
 	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
-	tps->tps_queue_size++;
-	ast_cond_signal(&tps->poll_cond);
+	previous_size = tps->tps_queue_size++;
 	ast_mutex_unlock(&tps->taskprocessor_lock);
+	tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
 	return 0;
 }
 
+int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
+{
+	struct tps_task *t;
+	int size;
+
+	if (!(t = tps_taskprocessor_pop(tps))) {
+		return 0;
+	}
+
+	t->execute(t->datap);
+
+	tps_task_free(t);
+
+	ast_mutex_lock(&tps->taskprocessor_lock);
+	size = tps_taskprocessor_depth(tps);
+	if (tps->stats) {
+		tps->stats->_tasks_processed_count++;
+		if (size > tps->stats->max_qsize) {
+			tps->stats->max_qsize = size;
+		}
+	}
+	ast_mutex_unlock(&tps->taskprocessor_lock);
+
+	if (size == 0) {
+		tps->listener->callbacks->emptied(tps->listener);
+		return 0;
+	}
+	return 1;
+}




More information about the svn-commits mailing list