[asterisk-commits] mmichelson: branch mmichelson/threadpool r376121 - in /team/mmichelson/thread...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Fri Nov 9 16:28:13 CST 2012


Author: mmichelson
Date: Fri Nov  9 16:28:10 2012
New Revision: 376121

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=376121
Log:
Genericize the allocation and destruction of taskprocessor listeners.

The goal of this is to take the responsibility away from individual
listeners to be sure to properly unref the taskprocessor.


Modified:
    team/mmichelson/threadpool/include/asterisk/taskprocessor.h
    team/mmichelson/threadpool/main/astobj2.c
    team/mmichelson/threadpool/main/taskprocessor.c

Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Fri Nov  9 16:28:10 2012
@@ -63,10 +63,14 @@
 struct ast_taskprocessor_listener;
 
 struct ast_taskprocessor_listener_callbacks {
+	/*! Allocate the listener's private data */
+	void *(*alloc)(struct ast_taskprocessor_listener *listener);
 	/*! Indicates a task was pushed to the processor */
 	void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
 	/*! Indicates the task processor has become empty */
 	void (*emptied)(struct ast_taskprocessor_listener *listener);
+	/*! Destroy the listener's private data */
+	void (*destroy)(void *private_data);
 };
 
 struct ast_taskprocessor_listener {
@@ -74,6 +78,9 @@
 	struct ast_taskprocessor *tps;
 	void *private_data;
 };
+
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+		struct ast_taskprocessor_listener_callbacks *callbacks);
 
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary

Modified: team/mmichelson/threadpool/main/astobj2.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/astobj2.c?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/main/astobj2.c (original)
+++ team/mmichelson/threadpool/main/astobj2.c Fri Nov  9 16:28:10 2012
@@ -431,6 +431,7 @@
 	int ret;
 
 	if (obj == NULL) {
+		ast_backtrace();
 		ast_assert(0);
 		return -1;
 	}

Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Fri Nov  9 16:28:10 2012
@@ -131,10 +131,62 @@
 	ast_cond_signal(&pvt->cond);
 }
 
+static void listener_destroy(void *obj)
+{
+	struct ast_taskprocessor_listener *listener = obj;
+
+	listener->callbacks->destroy(listener->private_data);
+
+	ao2_ref(listener->tps, -1);
+	listener->tps = NULL;
+}
+
+static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+{
+	SCOPED_MUTEX(lock, &pvt->lock);
+	while (!pvt->wake_up) {
+		ast_cond_wait(&pvt->cond, lock);
+	}
+	pvt->wake_up = 0;
+	return pvt->dead;
+}
+
+/* this is the task processing worker function */
+static void *tps_processing_function(void *data)
+{
+	struct ast_taskprocessor_listener *listener = data;
+	struct ast_taskprocessor *tps = listener->tps;
+	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	int dead = 0;
+
+	while (!dead) {
+		if (!ast_taskprocessor_execute(tps)) {
+			dead = default_tps_idle(pvt);
+		}
+	}
+	return NULL;
+}
+
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+	struct default_taskprocessor_listener_pvt *pvt;
+
+	pvt = ast_calloc(1, sizeof(*pvt));
+	if (!pvt) {
+		return NULL;
+	}
+	ast_cond_init(&pvt->cond, NULL);
+	ast_mutex_init(&pvt->lock);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+		return NULL;
+	}
+	return pvt;
+}
+
 static void default_listener_destroy(void *obj)
 {
-	struct ast_taskprocessor_listener *listener = obj;
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+	struct default_taskprocessor_listener_pvt *pvt = obj;
 
 	default_tps_wake_up(pvt, 1);
 	pthread_join(pvt->poll_thread, NULL);
@@ -142,35 +194,6 @@
 	ast_mutex_destroy(&pvt->lock);
 	ast_cond_destroy(&pvt->cond);
 	ast_free(pvt);
-
-	ao2_ref(listener->tps, -1);
-	listener->tps = NULL;
-}
-
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
-{
-	SCOPED_MUTEX(lock, &pvt->lock);
-	while (!pvt->wake_up) {
-		ast_cond_wait(&pvt->cond, lock);
-	}
-	pvt->wake_up = 0;
-	return pvt->dead;
-}
-
-/* this is the task processing worker function */
-static void *tps_processing_function(void *data)
-{
-	struct ast_taskprocessor_listener *listener = data;
-	struct ast_taskprocessor *tps = listener->tps;
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
-	int dead = 0;
-
-	while (!dead) {
-		if (!ast_taskprocessor_execute(tps)) {
-			dead = default_tps_idle(pvt);
-		}
-	}
-	return NULL;
 }
 
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
@@ -188,8 +211,10 @@
 }
 
 static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+	.alloc = default_listener_alloc,
 	.task_pushed = default_task_pushed,
 	.emptied = default_emptied,
+	.destroy = default_listener_destroy,
 };
 
 /*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@
 	return tps->name;
 }
 
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
-{
-	struct ast_taskprocessor_listener *listener;
-	struct default_taskprocessor_listener_pvt *pvt;
-
-	listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+		struct ast_taskprocessor_listener_callbacks *callbacks)
+{
+	RAII_VAR(struct ast_taskprocessor_listener *, listener,
+			ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+	
 	if (!listener) {
 		return NULL;
 	}
-	pvt = ast_calloc(1, sizeof(*pvt));
-	if (!pvt) {
-		ao2_ref(listener, -1);
-		return NULL;
-	}
-	listener->callbacks = &default_listener_callbacks;
-	listener->private_data = pvt;
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
-	pvt->poll_thread = AST_PTHREADT_NULL;
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
-		ao2_ref(listener, -1);
-		return NULL;
-	}
+	listener->callbacks = callbacks;
+	listener->private_data = listener->callbacks->alloc(listener);
+	if (!listener->private_data) {
+		return NULL;
+	}
+
+	ao2_ref(listener, +1);
 	return listener;
 }
 
@@ -480,9 +498,18 @@
 		return NULL;
 	}
 	/* Create a new taskprocessor. Start by creating a default listener */
-	listener = default_listener_alloc();
+	listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+	if (!listener) {
+		return NULL;
+	}
 
 	p = ast_taskprocessor_create_with_listener(name, listener);
+	if (!p) {
+		ao2_ref(listener, -1);
+		return NULL;
+	}
+
+	/* Unref listener here since the taskprocessor has gained a reference to the listener */
 	ao2_ref(listener, -1);
 	return p;
 




More information about the asterisk-commits mailing list