[svn-commits] dhubbard: branch dhubbard/named_processors r110244 - in /team/dhubbard/named_...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Thu Mar 20 00:24:04 CDT 2008


Author: dhubbard
Date: Thu Mar 20 00:24:03 2008
New Revision: 110244

URL: http://svn.digium.com/view/asterisk?view=rev&rev=110244
Log:
doxygenize, remove some old sandbox files, rename some functions

Removed:
    team/dhubbard/named_processors/res/res_testobserver.c
    team/dhubbard/named_processors/res/sandbox/
Modified:
    team/dhubbard/named_processors/apps/app_queue.c
    team/dhubbard/named_processors/include/asterisk/taskprocessor.h
    team/dhubbard/named_processors/main/asterisk.c
    team/dhubbard/named_processors/main/pbx.c
    team/dhubbard/named_processors/main/taskprocessor.c
    team/dhubbard/named_processors/res/Makefile

Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Thu Mar 20 00:24:03 2008
@@ -6211,7 +6211,7 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	tpsi = ast_taskprocessor_alloc("app_queue", 0);
+	tpsi = ast_taskprocessor_reference("app_queue", 0);
 	tpsp = ast_taskproducer_alloc(tpsi);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))

Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Thu Mar 20 00:24:03 2008
@@ -67,18 +67,16 @@
 
 unsigned char _evtq_poll_thread_run;
 
+int ast_taskprocessor_init(void);
 struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src);
 int ast_task_free(struct a_task* task);
 
 int ast_taskprocessor_push(struct taskprocessor_singleton_info* tp, struct a_task* t);
 struct a_task* ast_taskprocessor_pop(struct taskprocessor_singleton_info* tp);
-int ast_taskprocessor_size(struct taskprocessor_singleton_info* tp);
+int ast_taskprocessor_depth(struct taskprocessor_singleton_info* tp);
 
-struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*));
-int size_of_taskprocessor_singleton_list(void);
-
-int register_taskprocessor_clis(void);
-int unregister_taskprocessor_clis(void);
+struct taskprocessor_singleton_info *ast_taskprocessor_reference(const char *name, void *(*func)(void*));
+int ast_taskprocessor_count(void);
 
 struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor);
 #endif

Modified: team/dhubbard/named_processors/main/asterisk.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/asterisk.c?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/main/asterisk.c (original)
+++ team/dhubbard/named_processors/main/asterisk.c Thu Mar 20 00:24:03 2008
@@ -122,6 +122,7 @@
 #include "asterisk/devicestate.h"
 #include "asterisk/module.h"
 #include "asterisk/dsp.h"
+#include "asterisk/taskprocessor.h"
 
 #include "asterisk/doxyref.h"		/* Doxygen documentation */
 
@@ -2813,6 +2814,7 @@
 	ast_builtins_init();
 	ast_utils_init();
 	tdd_init();
+	ast_taskprocessor_init();
 
 	if (getenv("HOME")) 
 		snprintf(filename, sizeof(filename), "%s/.asterisk_history", getenv("HOME"));

Modified: team/dhubbard/named_processors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/pbx.c?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/main/pbx.c (original)
+++ team/dhubbard/named_processors/main/pbx.c Thu Mar 20 00:24:03 2008
@@ -7837,7 +7837,7 @@
 	/* Register manager application */
 	ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
 
-	taskprocessor = ast_taskprocessor_alloc("pbx", 0);
+	taskprocessor = ast_taskprocessor_reference("pbx", 0);
 	taskproducer = ast_taskproducer_alloc(taskprocessor);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,

Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Thu Mar 20 00:24:03 2008
@@ -15,9 +15,14 @@
  * the GNU General Public License Version 2. See the LICENSE file
  * at the top of the source tree.
  */
-/*
- * Maintain a container of taskprocessor threads that are uniquely named
- */
+/*! \file
+ *
+ * \brief Maintain a container of taskprocessor threads that are uniquely named
+ * and can be shared across modules.
+ *
+ * \author Dwayne Hubbard <dhubbard at digium.com>
+ */
+
 #include <asterisk.h>
 #include <asterisk/astobj2.h>
 #include <asterisk/cli.h>
@@ -32,15 +37,14 @@
 static int _global_killflag = 0;
 AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
 
-static int _global_clireg = 0;
-AST_MUTEX_DEFINE_STATIC(_global_clireg_lock);
-
 pthread_attr_t _attribute[100];
 AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
 static int _taskprocessor_singletons_list_size = 0;
 
-static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
-static void destroy_taskprocessor_singleton(void *tps);
+static void* default_taskprocessor_thread_function(void* data);
+static struct taskprocessor_singleton_info* tps_default_constructor(int poll_freq);
+static int tps_taskprocessor_add(struct taskprocessor_singleton_info* t);
+static void tps_taskprocessor_destroy(void *tps);
 static void destroy_taskproducer(void *tp);
 static int taskprocessor_ping(struct a_task* e);
 
@@ -52,6 +56,22 @@
 	AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
 };
 
+/*! \brief Perform necessary taskprocessor subsystem initialization stuff
+ * \param void
+ * \return 0
+ */
+int ast_taskprocessor_init(void)
+{
+	ast_cli_register_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+	return 0;
+}
+
+/*! \brief Allocate a task structure
+ * \param task_exe function for the taskprocessor to execute
+ * \param datap data pointer for the task
+ * \param src where the task came from, this is going to be debug only
+ * \return A task prepared to be queued into a taskprocessor
+ */
 struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src)
 {
 	struct a_task *t;
@@ -66,15 +86,24 @@
 	return t;
 }
 	
-int ast_task_free(struct a_task* t)
-{
-	if (t->_datap) {
-		ast_free(t->_datap);
+/*! \brief Release the task resources
+ * \param task the task to be freed
+ * \return 0 on success
+ */
+int ast_task_free(struct a_task* task)
+{
+	if (task->_datap) {
+		ast_free(task->_datap);
 	}
 	return 0;
 }
 
-static char *complete_taskprocessor_ping(struct taskprocessor_singleton_info *p, struct ast_cli_args *a) {
+/*! \brief Tab completion function
+ * \param p taskprocessor_singleton_info structure
+ * \param a CLI arguments
+ * \return NULL
+ */
+static char *tps_taskprocessor_tab_complete(struct taskprocessor_singleton_info *p, struct ast_cli_args *a) {
 	int tklen;
 	int wordnum = 0;
 
@@ -93,6 +122,12 @@
 	return NULL;
 }
 
+/*! \brief CLI 'taskprocessor ping' operation queues a ping task to the specified taskprocessor
+ * \param e CLI entries
+ * \param cmd 
+ * \param a CLI arguments
+ * \return CLI_SUCCESS on success, CLI_SHOWUSAGE on error 
+ */
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	int found = 0;
@@ -107,7 +142,7 @@
 			"	Displays the time required for a processor to deliver a task\n";
 		return NULL;
 	case CLI_GENERATE:
-		return complete_taskprocessor_ping(p, a);
+		return tps_taskprocessor_tab_complete(p, a);
 	}
 
 	if (a->argc != 3)
@@ -134,9 +169,15 @@
 		ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
 		ast_task_free(t);
 	}
-	return RESULT_SUCCESS;	
-}
-
+	return CLI_SUCCESS;	
+}
+
+/*! \brief CLI 'taskprocessor show stats' operation lists all taskprocessors and their statistics
+ * \param e CLI entries
+ * \param cmd
+ * \param a CLI arguments
+ * \return CLI_SUCCESS on success, CLI_SHOWUSAGE on error
+ */
 static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
 	char name[256];
@@ -172,9 +213,14 @@
 	}
 	AST_LIST_UNLOCK(&_taskprocessor_singletons);
 	ast_cli(a->fd, "\n\n");
-	return RESULT_SUCCESS;	
-}
-
+	return CLI_SUCCESS;	
+}
+
+/*! \brief The default taskprocessing thread pops tasks off its queue and executes it
+ * \param data taskprocessor_singleton_info structure for the named thread
+ * \note The default taskprocessor thread function can be overridden via ast_taskprocessor_reference()
+ * \return NULL
+ */
 static void* default_taskprocessor_thread_function(void* data)
 {
 	struct taskprocessor_singleton_info* i;
@@ -191,7 +237,7 @@
 	}
 
 	while ((!killflag) && (i->_poll_thread_run)) {
-		if ((size = ast_taskprocessor_size(i)) > 0) {
+		if ((size = ast_taskprocessor_depth(i)) > 0) {
 			/* stuff is in the queue */
 			t = ast_taskprocessor_pop(i);
 			if (!t) {
@@ -232,7 +278,7 @@
 		killflag = _global_killflag;
 		ast_mutex_unlock(&_global_killflag_lock);
 	}
-	while (ast_taskprocessor_size(i)) {
+	while (ast_taskprocessor_depth(i)) {
 		/* stuff is in the queue */
 		t = ast_taskprocessor_pop(i);
 		if (t) {
@@ -244,33 +290,28 @@
 	return NULL;
 }
 
-static int taskprocessor_ping(struct a_task* e)
-{
-	if (!e) {
-		ast_log(LOG_ERROR, "Huh? No event!!\n");
+/*! \brief CLI 'taskprocessor ping <blah>' operation handler
+ * \param task the ping task queued by the CLI operation
+ * \return 0 on success, -1 on error
+ */
+static int taskprocessor_ping(struct a_task* task)
+{
+	if (!task) {
+		ast_log(LOG_ERROR, "Huh?  There is no task?  This is totally unexpected!\n");
 		return -1;
 	}
-	ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
+	ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", task->_source);
 	return 0;
 }
 
-int register_taskprocessor_clis(void)
-{
-	ast_cli_register_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
-	return 0;
-}
-
-int unregister_taskprocessor_clis(void)
-{
-	ast_cli_unregister_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
-	return 0;
-}
-
-
+/*! \brief The default taskprocessor constructor creates an initialized taskprocessor structure
+ * \param poll_freq The polling frequency of the taskprocessor
+ * \return taskprocessor_singleton_info structure on success, NULL on error
+ */
 static struct taskprocessor_singleton_info* tps_default_constructor(int poll_freq)
 {
 	struct taskprocessor_singleton_info* tps;
-	tps = ao2_alloc(sizeof(*tps), destroy_taskprocessor_singleton);
+	tps = ao2_alloc(sizeof(*tps), tps_taskprocessor_destroy);
 	if (!tps) {
 		ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_info structure.\n");
 		return NULL;
@@ -290,19 +331,17 @@
 	}
 	return tps;
 }
-	
-struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*))
-{
-	int index;
+
+/*! \brief Return a pointer to the taskprocessor singleton structure and create it if necessary.
+ * \param name the name of the taskprocessor singleton
+ * \param custom_func the function executed by the taskprocessor thread
+ * \return taskprocessor_singleton_info pointer on success, NULL on error
+ */
+struct taskprocessor_singleton_info *ast_taskprocessor_reference(const char *name, void *(*custom_func)(void*))
+{
+	int rc, index;
 	struct taskprocessor_singleton_info *p;
 		
-	ast_mutex_lock(&_global_clireg_lock);
-	if (!_global_clireg) {
-		register_taskprocessor_clis();
-		_global_clireg = 1;	
-	}
-	ast_mutex_unlock(&_global_clireg_lock);
-	
 	if (!name) {
 		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
 		return NULL;
@@ -323,12 +362,12 @@
 	}
 	snprintf(p->_name, sizeof(p->_name), "%s", name);
 	p->_poll_thread_run = 1;
-	if (add_taskprocessor_singleton(p) < 0) {
+	if (tps_taskprocessor_add(p) < 0) {
 		ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
 		ao2_ref(p, -1);
 		return NULL;
 	}
-	index = size_of_taskprocessor_singleton_list();
+	index = ast_taskprocessor_count();
 	ast_log(LOG_DEBUG, "creating taskprocessor \'%s\' at index %d\n", name, index);
 	pthread_attr_init(&_attribute[index]);
 	/* stay stopped if we are supposed to be stopped */
@@ -348,8 +387,13 @@
 		/* wake it up */
 		pthread_kill(p->_poll_thread, SIGURG);
 	} else {
-		/* create it */
-		if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
+		/* create the thread.  This may seem silly, but it results in nicer 'core show threads' output */
+		if (custom_func) {
+			rc = ast_pthread_create(&p->_poll_thread, &_attribute[index], custom_func, p);
+		} else {
+			rc = ast_pthread_create(&p->_poll_thread, &_attribute[index], default_taskprocessor_thread_function, p);
+		}
+		if (rc < 0) {
 			ast_mutex_unlock(&p->_taskprocessor_lock);
 			ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
 			ao2_ref(p, -1);
@@ -360,8 +404,12 @@
 	ast_mutex_unlock(&p->_taskprocessor_lock);
 	return p;
 }
-
-static void destroy_taskprocessor_singleton(void *tps)
+	
+/*! \brief The taskprocessor destructor is called by astobj2 when the reference count reaches zero
+ * \param tps taskprocessor_singleton_info structure
+ * \return void
+ */
+static void tps_taskprocessor_destroy(void *tps)
 {
 	struct taskprocessor_singleton_info *n;
 	struct taskprocessor_singleton_info *t = (struct taskprocessor_singleton_info *)tps;
@@ -376,7 +424,6 @@
 		if (n == t) {
 			AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
 			_taskprocessor_singletons_list_size -= 1;
-			//AST_LIST_UNLOCK(&_taskprocessor_singletons);
 			ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' removed.\n", t->_name);
 			break;
 		}
@@ -395,7 +442,11 @@
 	return;
 }
 
-static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
+/*! \brief Add a new taskprocessor_singleton_info structure to the taskprocessor container
+ * \param t taskprocessor_singleton_info structure to add to the taskprocessor container
+ * \return 0 on success, -1 on error
+ */
+static int tps_taskprocessor_add(struct taskprocessor_singleton_info* t)
 {
 	struct taskprocessor_singleton_info* n = NULL;
 
@@ -413,7 +464,11 @@
 	return 0;
 }
 
-int size_of_taskprocessor_singleton_list(void)
+/*! \brief Return the number of taskprocessors in the taskprocessor container
+ * \param void
+ * \return The number of taskprocessors in the taskprocessor container
+ */
+int ast_taskprocessor_count(void)
 {
 	int size;
 	AST_LIST_LOCK(&_taskprocessor_singletons);
@@ -422,12 +477,17 @@
 	return size;
 }
 
+/*! \brief Push a task into the taskprocessor queue
+ * \param tps taskprocessor to push task
+ * \param t task to push to taskprocessor
+ * \return 0 on success, -1 on error
+ */
 int ast_taskprocessor_push(struct taskprocessor_singleton_info* tps, struct a_task* t)
 {
 	int lock_failures = 0;
 
 	if (!tps || !t) {
-		ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tps, (long)t);
+		ast_log(LOG_ERROR, "A %s is required for this function.\n", (tps)?"task":"taskprocessor");
 		return -1;
 	}
 	AST_LIST_LOCK(&tps->_queue);
@@ -443,12 +503,15 @@
 	}
 	AST_LIST_INSERT_TAIL(&tps->_queue, t, list);
 	tps->_queue_size+=1;
-	ast_cond_signal(&tps->_poll_cond);
 	ast_mutex_unlock(&tps->_taskprocessor_lock);
 	AST_LIST_UNLOCK(&tps->_queue);
 	return 0;
 }
 
+/*! \brief Pop the front task off the taskprocessor queue and return it to the caller
+ * \param tps taskprocessor to pop task from
+ * \return task on success, NULL on error
+ */
 struct a_task* ast_taskprocessor_pop(struct taskprocessor_singleton_info* tps)
 {
 	struct a_task* t=NULL;
@@ -474,7 +537,11 @@
 	return t;
 }
 
-int ast_taskprocessor_size(struct taskprocessor_singleton_info* tps)
+/*! \brief Return the number of tasks waiting in the taskprocessor queue
+ * \param tps taskprocessor to return queue depth
+ * \return depth on success, -1 on error
+ */
+int ast_taskprocessor_depth(struct taskprocessor_singleton_info* tps)
 {
 	int size = -1;
 	int lock_failures = 0;
@@ -496,8 +563,11 @@
 	return size;
 }
 
-/* provide a default implementation of a queue() command */
-static int default_queue_task(struct taskproducer* producer, struct a_task* task)
+/*! \brief Provide a default implementation of a queue() helper function for a taskprocessor
+ * \param producer wrapper around a taskprocessor and a queue() function
+ * \return 0 on success, -1 on error
+ */
+static int tps_default_queue_task(struct taskproducer* producer, struct a_task* task)
 {
 	if ((!producer) || (!task)) {
 		ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
@@ -511,10 +581,14 @@
 	ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
 	ast_cond_signal(&producer->_taskprocessor->_poll_cond);
 	ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
+	ast_log(LOG_DEBUG, "task queued to taskprocessor\n");
 	return 0;
 }
 
-/* create and initialize a task producer */
+/*! \brief Allocate and initialize a taskproducer structure with a default queue_task helper function
+ * \param processor taskprocessor to queue tasks
+ * \return taskproducer on success, NULL on error
+ */
 struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor)
 {
 	struct taskproducer* p;
@@ -525,10 +599,15 @@
 	}
 	p->_taskprocessor = processor;
 	ao2_ref(processor, 1);
-	p->queue_task = default_queue_task;
+	p->queue_task = tps_default_queue_task;
+	ast_log(LOG_DEBUG, "taskproducer created\n");
 	return p;
 }
 
+/*! \brief Free taskproducer resources
+ * \param tp taskproducer
+ * \return void
+ */
 static void destroy_taskproducer(void *tp)
 {
 	struct taskproducer *p;

Modified: team/dhubbard/named_processors/res/Makefile
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/Makefile?view=diff&rev=110244&r1=110243&r2=110244
==============================================================================
--- team/dhubbard/named_processors/res/Makefile (original)
+++ team/dhubbard/named_processors/res/Makefile Thu Mar 20 00:24:03 2008
@@ -48,14 +48,6 @@
 
 ael/pval.o: ael/pval.c
 
-sandbox/taskconsumer.o: sandbox/taskconsumer.c
-sandbox/taskconsumer.o: ASTCFLAGS+=-I. -Isandbox
-
-sandbox/simobject.o: sandbox/simobject.c
-sandbox/simobject.o: ASTCFLAGS+=-I. -Isandbox
-
-$(if $(filter res_testobserver,$(EMBEDDED_MODS)),modules.link,res_testobserver.so): sandbox/taskconsumer.o sandbox/simobject.o
-
 clean::
 	rm -f snmp/*.o
 	rm -f ael/*.o




More information about the svn-commits mailing list