[asterisk-commits] dlee: branch dlee/tp-local r399778 - in /team/dlee/tp-local: include/asterisk...
SVN commits to the Asterisk project
asterisk-commits at lists.digium.com
Wed Sep 25 13:09:30 CDT 2013
Author: dlee
Date: Wed Sep 25 13:09:28 2013
New Revision: 399778
URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399778
Log:
Taskprocessor local support
Modified:
team/dlee/tp-local/include/asterisk/taskprocessor.h
team/dlee/tp-local/main/taskprocessor.c
Modified: team/dlee/tp-local/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/include/asterisk/taskprocessor.h?view=diff&rev=399778&r1=399777&r2=399778
==============================================================================
--- team/dlee/tp-local/include/asterisk/taskprocessor.h (original)
+++ team/dlee/tp-local/include/asterisk/taskprocessor.h Wed Sep 25 13:09:28 2013
@@ -175,6 +175,14 @@
*/
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data);
+
+struct ast_taskprocessor_local {
+ void *local_data;
+ void *data;
+};
+
/*!
* \brief Unreference the specified taskprocessor and its reference count will decrement.
*
@@ -197,6 +205,8 @@
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
/*!
* \brief Pop a task off the taskprocessor and execute it.
*
Modified: team/dlee/tp-local/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/main/taskprocessor.c?view=diff&rev=399778&r1=399777&r2=399778
==============================================================================
--- team/dlee/tp-local/main/taskprocessor.c (original)
+++ team/dlee/tp-local/main/taskprocessor.c Wed Sep 25 13:09:28 2013
@@ -48,11 +48,15 @@
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
- int (*execute)(void *datap);
+ union {
+ int (*execute)(void *datap);
+ int (*execute_local)(struct ast_taskprocessor_local *local);
+ } callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
+ unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -69,6 +73,7 @@
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
+ void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
@@ -282,10 +287,41 @@
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
- if ((t = ast_calloc(1, sizeof(*t)))) {
- t->execute = task_exe;
- t->datap = datap;
- }
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute = task_exe;
+ t->datap = datap;
+
+ return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+ struct tps_task *t;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute_local = task_exe;
+ t->datap = datap;
+ t->wants_local = 1;
+
return t;
}
@@ -643,6 +679,13 @@
return __allocate_taskprocessor(name, listener);
}
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data)
+{
+ SCOPED_AO2LOCK(lock, tps);
+ tps->local_data = local_data;
+}
+
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
@@ -664,20 +707,16 @@
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
-{
- struct tps_task *t;
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
+{
int previous_size;
int was_empty;
- if (!tps || !task_exe) {
- ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+ if (!tps) {
+ ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
- if (!(t = tps_task_alloc(task_exe, datap))) {
- ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
- return -1;
- }
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
@@ -688,8 +727,19 @@
return 0;
}
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
+ struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
@@ -701,9 +751,18 @@
}
tps->executing = 1;
+
+ if (t->wants_local) {
+ local.local_data = tps->local_data;
+ local.data = t->datap;
+ }
ao2_unlock(tps);
- t->execute(t->datap);
+ if (t->wants_local) {
+ t->callback.execute_local(&local);
+ } else {
+ t->callback.execute(t->datap);
+ }
tps_task_free(t);
ao2_lock(tps);
More information about the asterisk-commits
mailing list