[asterisk-commits] mmichelson: branch mmichelson/threadpool r376121 - in /team/mmichelson/thread...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Fri Nov 9 16:28:13 CST 2012
Author: mmichelson
Date: Fri Nov 9 16:28:10 2012
New Revision: 376121
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=376121
Log:
Genericize the allocation and destruction of taskprocessor listeners.
The goal of this is to take the responsibility away from individual
listeners to be sure to properly unref the taskprocessor.
Modified:
team/mmichelson/threadpool/include/asterisk/taskprocessor.h
team/mmichelson/threadpool/main/astobj2.c
team/mmichelson/threadpool/main/taskprocessor.c
Modified: team/mmichelson/threadpool/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/include/asterisk/taskprocessor.h?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/include/asterisk/taskprocessor.h (original)
+++ team/mmichelson/threadpool/include/asterisk/taskprocessor.h Fri Nov 9 16:28:10 2012
@@ -63,10 +63,14 @@
struct ast_taskprocessor_listener;
struct ast_taskprocessor_listener_callbacks {
+ /*! Allocate the listener's private data */
+ void *(*alloc)(struct ast_taskprocessor_listener *listener);
/*! Indicates a task was pushed to the processor */
void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
/*! Indicates the task processor has become empty */
void (*emptied)(struct ast_taskprocessor_listener *listener);
+ /*! Destroy the listener's private data */
+ void (*destroy)(void *private_data);
};
struct ast_taskprocessor_listener {
@@ -74,6 +78,9 @@
struct ast_taskprocessor *tps;
void *private_data;
};
+
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+ struct ast_taskprocessor_listener_callbacks *callbacks);
/*!
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
Modified: team/mmichelson/threadpool/main/astobj2.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/astobj2.c?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/main/astobj2.c (original)
+++ team/mmichelson/threadpool/main/astobj2.c Fri Nov 9 16:28:10 2012
@@ -431,6 +431,7 @@
int ret;
if (obj == NULL) {
+ ast_backtrace();
ast_assert(0);
return -1;
}
Modified: team/mmichelson/threadpool/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/mmichelson/threadpool/main/taskprocessor.c?view=diff&rev=376121&r1=376120&r2=376121
==============================================================================
--- team/mmichelson/threadpool/main/taskprocessor.c (original)
+++ team/mmichelson/threadpool/main/taskprocessor.c Fri Nov 9 16:28:10 2012
@@ -131,10 +131,62 @@
ast_cond_signal(&pvt->cond);
}
+static void listener_destroy(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+
+ listener->callbacks->destroy(listener->private_data);
+
+ ao2_ref(listener->tps, -1);
+ listener->tps = NULL;
+}
+
+static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+{
+ SCOPED_MUTEX(lock, &pvt->lock);
+ while (!pvt->wake_up) {
+ ast_cond_wait(&pvt->cond, lock);
+ }
+ pvt->wake_up = 0;
+ return pvt->dead;
+}
+
+/* this is the task processing worker function */
+static void *tps_processing_function(void *data)
+{
+ struct ast_taskprocessor_listener *listener = data;
+ struct ast_taskprocessor *tps = listener->tps;
+ struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ int dead = 0;
+
+ while (!dead) {
+ if (!ast_taskprocessor_execute(tps)) {
+ dead = default_tps_idle(pvt);
+ }
+ }
+ return NULL;
+}
+
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ return NULL;
+ }
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+ return NULL;
+ }
+ return pvt;
+}
+
static void default_listener_destroy(void *obj)
{
- struct ast_taskprocessor_listener *listener = obj;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = obj;
default_tps_wake_up(pvt, 1);
pthread_join(pvt->poll_thread, NULL);
@@ -142,35 +194,6 @@
ast_mutex_destroy(&pvt->lock);
ast_cond_destroy(&pvt->cond);
ast_free(pvt);
-
- ao2_ref(listener->tps, -1);
- listener->tps = NULL;
-}
-
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
-{
- SCOPED_MUTEX(lock, &pvt->lock);
- while (!pvt->wake_up) {
- ast_cond_wait(&pvt->cond, lock);
- }
- pvt->wake_up = 0;
- return pvt->dead;
-}
-
-/* this is the task processing worker function */
-static void *tps_processing_function(void *data)
-{
- struct ast_taskprocessor_listener *listener = data;
- struct ast_taskprocessor *tps = listener->tps;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
- int dead = 0;
-
- while (!dead) {
- if (!ast_taskprocessor_execute(tps)) {
- dead = default_tps_idle(pvt);
- }
- }
- return NULL;
}
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
@@ -188,8 +211,10 @@
}
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+ .alloc = default_listener_alloc,
.task_pushed = default_task_pushed,
.emptied = default_emptied,
+ .destroy = default_listener_destroy,
};
/*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@
return tps->name;
}
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
-{
- struct ast_taskprocessor_listener *listener;
- struct default_taskprocessor_listener_pvt *pvt;
-
- listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+ struct ast_taskprocessor_listener_callbacks *callbacks)
+{
+ RAII_VAR(struct ast_taskprocessor_listener *, listener,
+ ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+
if (!listener) {
return NULL;
}
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- ao2_ref(listener, -1);
- return NULL;
- }
- listener->callbacks = &default_listener_callbacks;
- listener->private_data = pvt;
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
- pvt->poll_thread = AST_PTHREADT_NULL;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
- ao2_ref(listener, -1);
- return NULL;
- }
+ listener->callbacks = callbacks;
+ listener->private_data = listener->callbacks->alloc(listener);
+ if (!listener->private_data) {
+ return NULL;
+ }
+
+ ao2_ref(listener, +1);
return listener;
}
@@ -480,9 +498,18 @@
return NULL;
}
/* Create a new taskprocessor. Start by creating a default listener */
- listener = default_listener_alloc();
+ listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+ if (!listener) {
+ return NULL;
+ }
p = ast_taskprocessor_create_with_listener(name, listener);
+ if (!p) {
+ ao2_ref(listener, -1);
+ return NULL;
+ }
+
+ /* Unref listener here since the taskprocessor has gained a reference to the listener */
ao2_ref(listener, -1);
return p;
More information about the asterisk-commits
mailing list