[asterisk-commits] mmichelson: branch mmichelson/threadpool r376499 - in /team/mmichelson/thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Mon Nov 19 15:31:36 CST 2012


Author: mmichelson
Date: Mon Nov 19 15:31:32 2012
New Revision: 376499

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=376499
Log:
Reorganize code and change behavior of ast_taskprocessor_execute() when taskprocessor is shutting down.

Moved code around to be easier to follow.

ast_taskprocessor_execute() will now return 0 if the taskprocessor is being shut down.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/main/taskprocessor.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=376499&r1=376498&r2=376499
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Mon Nov 19 15:31:32 2012
@@ -22,28 +22,33 @@
  *
  * \author Dwayne M. Hubbard <dhubbard at digium.com>
  *
- * \note A taskprocessor is a named singleton containing a task queue that serializes tasks pushed
- * into it by [a] module(s) that reference the taskprocessor. A taskprocessor is created the first
- * time its name is requested via the ast_taskprocessor_get() function and destroyed when the
- * taskprocessor reference count reaches zero. A taskprocessor also contains an accompanying
- * listener that is told when changes in the task queue occur.
+ * \note A taskprocessor is a named singleton containing a task queue that
+ * serializes tasks pushed into it by [a] module(s) that reference the taskprocessor.
+ * A taskprocessor is created the first time its name is requested via the
+ * ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener()
+ * function and destroyed when the taskprocessor reference count reaches zero. A
+ * taskprocessor also contains an accompanying listener that is notified when changes
+ * in the task queue occur.
  *
  * A task is a wrapper around a task-handling function pointer and a data
  * pointer.  A task is pushed into a taskprocessor queue using the 
  * ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the
- * taskprocessor after the task handling function returns.  A module releases its reference to a
- * taskprocessor using the ast_taskprocessor_unreference() function which may result in the
- * destruction of the taskprocessor if the taskprocessor's reference count reaches zero.  Tasks waiting
- * to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero
- * will be purged and released from the taskprocessor queue without being processed.
- *
- * The taskprocessor listener has the flexibility of doling out tasks to best fit the module's
- * needs. For instance, a taskprocessor listener may have a single dispatch thread that handles
- * all tasks, or it may dispatch tasks to a thread pool.
- *
- * There is a default taskprocessor listener that will be used if a taskprocessor is created without
- * a listener. This default listener runs tasks sequentially in a single thread. The listener will
- * execute tasks as long as there are tasks to be processed.
+ * taskprocessor after the task handling function returns.  A module releases its
+ * reference to a taskprocessor using the ast_taskprocessor_unreference() function which
+ * may result in the destruction of the taskprocessor if the taskprocessor's reference
+ * count reaches zero. When the taskprocessor's reference count reaches zero, its
+ * listener's shutdown() callback will be called. Any further attempts to execute tasks
+ * will be denied.
+ *
+ * The taskprocessor listener has the flexibility of doling out tasks to best fit the
+ * module's needs. For instance, a taskprocessor listener may have a single dispatch
+ * thread that handles all tasks, or it may dispatch tasks to a thread pool.
+ *
+ * There is a default taskprocessor listener that will be used if a taskprocessor is
+ * created without any explicit listener. This default listener runs tasks sequentially
+ * in a single thread. The listener will execute tasks as long as there are tasks to be
+ * processed. When the taskprocessor is shut down, the default listener will stop
+ * processing tasks and join its execution thread.
  */
 
 #ifndef __AST_TASKPROCESSOR_H__

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=376499&r1=376498&r2=376499
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Mon Nov 19 15:31:32 2012
@@ -76,6 +76,8 @@
 	/*! \brief Taskprocessor singleton list entry */
 	AST_LIST_ENTRY(ast_taskprocessor) list;
 	struct ast_taskprocessor_listener *listener;
+	/*! Indicates if the taskprocessor is in the process of shuting down */
+	unsigned int shutting_down:1;
 };
 #define TPS_MAX_BUCKETS 7
 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
@@ -123,6 +125,7 @@
 	int dead;
 };
 
+
 static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
 {
 	SCOPED_MUTEX(lock, &pvt->lock); 
@@ -131,20 +134,6 @@
 	ast_cond_signal(&pvt->cond);
 }
 
-static void listener_destroy(void *obj)
-{
-	struct ast_taskprocessor_listener *listener = obj;
-
-	listener->callbacks->destroy(listener->private_data);
-}
-
-static void listener_shutdown(struct ast_taskprocessor_listener *listener)
-{
-	listener->callbacks->shutdown(listener);
-	ao2_ref(listener->tps, -1);
-	listener->tps = NULL;
-}
-
 static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
 {
 	SCOPED_MUTEX(lock, &pvt->lock);
@@ -188,6 +177,20 @@
 	return pvt;
 }
 
+static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+{
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+	if (was_empty) {
+		default_tps_wake_up(pvt, 0);
+	}
+}
+
+static void default_emptied(struct ast_taskprocessor_listener *listener)
+{
+	/* No-op */
+}
+
 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -202,20 +205,6 @@
 	ast_mutex_destroy(&pvt->lock);
 	ast_cond_destroy(&pvt->cond);
 	ast_free(pvt);
-}
-
-static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
-{
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
-
-	if (was_empty) {
-		default_tps_wake_up(pvt, 0);
-	}
-}
-
-static void default_emptied(struct ast_taskprocessor_listener *listener)
-{
-	/* No-op */
 }
 
 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
@@ -438,16 +427,15 @@
 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
 {
 	struct tps_task *task;
-
-	if (!tps) {
-		ast_log(LOG_ERROR, "missing taskprocessor\n");
-		return NULL;
-	}
-	ao2_lock(tps);
+	SCOPED_AO2LOCK(lock, tps);
+
+	if (tps->shutting_down) {
+		return NULL;
+	}
+
 	if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
 		tps->tps_queue_size--;
 	}
-	ao2_unlock(tps);
 	return task;
 }
 
@@ -464,6 +452,20 @@
 		return NULL;
 	}
 	return tps->name;
+}
+
+static void listener_destroy(void *obj)
+{
+	struct ast_taskprocessor_listener *listener = obj;
+
+	listener->callbacks->destroy(listener->private_data);
+}
+
+static void listener_shutdown(struct ast_taskprocessor_listener *listener)
+{
+	listener->callbacks->shutdown(listener);
+	ao2_ref(listener->tps, -1);
+	listener->tps = NULL;
 }
 
 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)




More information about the asterisk-commits mailing list