[asterisk-commits] dlee: branch dlee/tp-local r399778 - in /team/dlee/tp-local: include/asterisk...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Sep 25 13:09:30 CDT 2013


Author: dlee
Date: Wed Sep 25 13:09:28 2013
New Revision: 399778

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=399778
Log:
Taskprocessor local support

Modified:
    team/dlee/tp-local/include/asterisk/taskprocessor.h
    team/dlee/tp-local/main/taskprocessor.c

Modified: team/dlee/tp-local/include/asterisk/taskprocessor.h
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/include/asterisk/taskprocessor.h?view=diff&rev=399778&r1=399777&r2=399778
==============================================================================
--- team/dlee/tp-local/include/asterisk/taskprocessor.h (original)
+++ team/dlee/tp-local/include/asterisk/taskprocessor.h Wed Sep 25 13:09:28 2013
@@ -175,6 +175,14 @@
  */
 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
 
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+	void *local_data);
+
+struct ast_taskprocessor_local {
+	void *local_data;
+	void *data;
+};
+
 /*!
  * \brief Unreference the specified taskprocessor and its reference count will decrement.
  *
@@ -197,6 +205,8 @@
  */
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
 /*!
  * \brief Pop a task off the taskprocessor and execute it.
  *

Modified: team/dlee/tp-local/main/taskprocessor.c
URL: http://svnview.digium.com/svn/asterisk/team/dlee/tp-local/main/taskprocessor.c?view=diff&rev=399778&r1=399777&r2=399778
==============================================================================
--- team/dlee/tp-local/main/taskprocessor.c (original)
+++ team/dlee/tp-local/main/taskprocessor.c Wed Sep 25 13:09:28 2013
@@ -48,11 +48,15 @@
  */
 struct tps_task {
 	/*! \brief The execute() task callback function pointer */
-	int (*execute)(void *datap);
+	union {
+		int (*execute)(void *datap);
+		int (*execute_local)(struct ast_taskprocessor_local *local);
+	} callback;
 	/*! \brief The data pointer for the task execute() function */
 	void *datap;
 	/*! \brief AST_LIST_ENTRY overhead */
 	AST_LIST_ENTRY(tps_task) list;
+	unsigned int wants_local:1;
 };
 
 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -69,6 +73,7 @@
 	const char *name;
 	/*! \brief Taskprocessor statistics */
 	struct tps_taskprocessor_stats *stats;
+	void *local_data;
 	/*! \brief Taskprocessor current queue size */
 	long tps_queue_size;
 	/*! \brief Taskprocessor queue */
@@ -282,10 +287,41 @@
 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
 {
 	struct tps_task *t;
-	if ((t = ast_calloc(1, sizeof(*t)))) {
-		t->execute = task_exe;
-		t->datap = datap;
-	}
+	if (!task_exe) {
+		ast_log(LOG_ERROR, "task_exe is NULL!\n");
+		return NULL;
+	}
+
+	t = ast_calloc(1, sizeof(*t));
+	if (!t) {
+		ast_log(LOG_ERROR, "failed to allocate task!\n");
+		return NULL;
+	}
+
+	t->callback.execute = task_exe;
+	t->datap = datap;
+
+	return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+	struct tps_task *t;
+	if (!task_exe) {
+		ast_log(LOG_ERROR, "task_exe is NULL!\n");
+		return NULL;
+	}
+
+	t = ast_calloc(1, sizeof(*t));
+	if (!t) {
+		ast_log(LOG_ERROR, "failed to allocate task!\n");
+		return NULL;
+	}
+
+	t->callback.execute_local = task_exe;
+	t->datap = datap;
+	t->wants_local = 1;
+
 	return t;
 }
 
@@ -643,6 +679,13 @@
 	return __allocate_taskprocessor(name, listener);
 }
 
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+	void *local_data)
+{
+	SCOPED_AO2LOCK(lock, tps);
+	tps->local_data = local_data;
+}
+
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
@@ -664,20 +707,16 @@
 }
 
 /* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
-{
-	struct tps_task *t;
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
+{
 	int previous_size;
 	int was_empty;
 
-	if (!tps || !task_exe) {
-		ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+	if (!tps) {
+		ast_log(LOG_ERROR, "tps is NULL!\n");
 		return -1;
 	}
-	if (!(t = tps_task_alloc(task_exe, datap))) {
-		ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
-		return -1;
-	}
+
 	ao2_lock(tps);
 	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
 	previous_size = tps->tps_queue_size++;
@@ -688,8 +727,19 @@
 	return 0;
 }
 
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+	return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+	return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
 {
+	struct ast_taskprocessor_local local;
 	struct tps_task *t;
 	int size;
 
@@ -701,9 +751,18 @@
 	}
 
 	tps->executing = 1;
+
+	if (t->wants_local) {
+		local.local_data = tps->local_data;
+		local.data = t->datap;
+	}
 	ao2_unlock(tps);
 
-	t->execute(t->datap);
+	if (t->wants_local) {
+		t->callback.execute_local(&local);
+	} else {
+		t->callback.execute(t->datap);
+	}
 	tps_task_free(t);
 
 	ao2_lock(tps);




More information about the asterisk-commits mailing list