[svn-commits] dhubbard: branch dhubbard/named_processors r107992 - in /team/dhubbard/named_...

SVN commits to the Digium repositories svn-commits at lists.digium.com
Wed Mar 12 01:40:40 CDT 2008


Author: dhubbard
Date: Wed Mar 12 01:40:40 2008
New Revision: 107992

URL: http://svn.digium.com/view/asterisk?view=rev&rev=107992
Log:
This is the initial proof of concept integration of named taskprocessors into app_queue.
I did this with Mark Michelson and a lot of changes were made.  A lot more changes are coming.

Modified:
    team/dhubbard/named_processors/apps/app_queue.c
    team/dhubbard/named_processors/res/res_testobserver.c
    team/dhubbard/named_processors/res/sandbox/include/task.h
    team/dhubbard/named_processors/res/sandbox/include/taskproducer.h
    team/dhubbard/named_processors/res/sandbox/simobject.c
    team/dhubbard/named_processors/res/sandbox/task.c
    team/dhubbard/named_processors/res/sandbox/taskconsumer.c
    team/dhubbard/named_processors/res/sandbox/taskprocessor.c
    team/dhubbard/named_processors/res/sandbox/taskproducer.c

Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Wed Mar 12 01:40:40 2008
@@ -92,6 +92,9 @@
 #include "asterisk/astobj2.h"
 #include "asterisk/strings.h"
 #include "asterisk/global_datastores.h"
+#include "asterisk/task.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/taskproducer.h"
 
 /*!
  * \par Please read before modifying this file.
@@ -130,6 +133,9 @@
 	{ QUEUE_STRATEGY_LINEAR, "linear" },
 	{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
 };
+
+struct taskproducer *tpsp;
+struct taskprocessor_singleton_info *tpsi;
 
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
@@ -674,7 +680,7 @@
  * Lock interface list find sc, iterate through each queues queue_member list for member to
  * update state inside queues
 */
-static void *handle_statechange(struct statechange *sc)
+static int handle_statechange(struct a_task* task)
 {
 	struct call_queue *q;
 	struct member *cur;
@@ -683,13 +689,16 @@
 	struct ao2_iterator queue_iter;
 	char *loc;
 	char *technology;
+	struct statechange *sc = task->_datap;
+
+	ast_log(LOG_NOTICE, "device %s received state change %s\n", sc->dev, devstate2str(sc->state));
 
 	technology = ast_strdupa(sc->dev);
 	loc = strchr(technology, '/');
 	if (loc) {
 		*loc++ = '\0';
 	} else {
-		return NULL;
+		return 0;
 	}
 
 	AST_LIST_LOCK(&interfaces);
@@ -708,7 +717,7 @@
 
 	if (!curint) {
 		ast_debug(3, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state));
-		return NULL;
+		return 0;
 	}
 
 	ast_debug(1, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
@@ -755,7 +764,7 @@
 		ao2_unlock(q);
 	}
 
-	return NULL;
+	return 0;
 }
 
 /*! \brief Data used by the device state thread */
@@ -774,64 +783,13 @@
 	.thread = AST_PTHREADT_NULL,
 };
 
-/*! \brief Consumer of the statechange queue */
-static void *device_state_thread(void *data)
-{
-	struct statechange *sc = NULL;
-
-	while (!device_state.stop) {
-		ast_mutex_lock(&device_state.lock);
-		if (!(sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry))) {
-			ast_cond_wait(&device_state.cond, &device_state.lock);
-			sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry);
-		}
-		ast_mutex_unlock(&device_state.lock);
-
-		/* Check to see if we were woken up to see the request to stop */
-		if (device_state.stop)
-			break;
-
-		if (!sc)
-			continue;
-
-		handle_statechange(sc);
-
-		ast_free(sc);
-		sc = NULL;
-	}
-
-	if (sc)
-		ast_free(sc);
-
-	while ((sc = AST_LIST_REMOVE_HEAD(&device_state.state_change_q, entry)))
-		ast_free(sc);
-
-	return NULL;
-}
-
-/*! \brief Producer of the statechange queue */
-static int statechange_queue(const char *dev, enum ast_device_state state)
-{
-	struct statechange *sc;
-
-	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(dev) + 1)))
-		return 0;
-
-	sc->state = state;
-	strcpy(sc->dev, dev);
-
-	ast_mutex_lock(&device_state.lock);
-	AST_LIST_INSERT_TAIL(&device_state.state_change_q, sc, entry);
-	ast_cond_signal(&device_state.cond);
-	ast_mutex_unlock(&device_state.lock);
-
-	return 0;
-}
-
 static void device_state_cb(const struct ast_event *event, void *unused)
 {
 	enum ast_device_state state;
 	const char *device;
+	struct statechange *sc;
+	struct a_task* t;
+	size_t datapsize;
 
 	state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
 	device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
@@ -840,8 +798,20 @@
 		ast_log(LOG_ERROR, "Received invalid event that had no device IE\n");
 		return;
 	}
-
-	statechange_queue(device, state);
+	datapsize = sizeof(*sc) + strlen(device) + 1;
+	if (!(sc = ast_calloc(1, datapsize))) {
+		ast_log(LOG_ERROR, "failed to calloc a state change struct\n");
+		return;
+	}
+	sc->state = state;
+	strcpy(sc->dev, device);
+	t = get_available_task(handle_statechange, sc, datapsize, "app_queue-device_state_cb");
+	if (tpsp->queue_task(tpsp, t) < 0) {
+		ast_log(LOG_WARNING, "queue_task failed!!\n");
+		release_task(t);
+	}
+	ast_log(LOG_NOTICE, "queued statechange for %s\n", device);
+	ast_free(sc); /* maybe alloca ?? */
 }
 
 /*! \brief allocate space for new queue member and set fields based on parameters passed */
@@ -6276,7 +6246,6 @@
 
 	ast_mutex_init(&device_state.lock);
 	ast_cond_init(&device_state.cond, NULL);
-	ast_pthread_create(&device_state.thread, NULL, device_state_thread, NULL);
 
 	ast_cli_register_multiple(cli_queue, sizeof(cli_queue) / sizeof(struct ast_cli_entry));
 	res = ast_register_application(app, queue_exec, synopsis, descrip);
@@ -6300,6 +6269,13 @@
 	res |= ast_custom_function_register(&queuememberlist_function);
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
+
+	if (!exists_taskprocessor_singleton("app_queue")) {
+		create_taskprocessor_singleton("app_queue", 0);
+	}
+	tpsi = get_taskprocessor_singleton("app_queue");
+	tpsp = construct_taskproducer(tpsi);
+
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
 		res = -1;
 

Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Wed Mar 12 01:40:40 2008
@@ -39,15 +39,16 @@
 #include <unistd.h>
 #include <string.h>
 
-#include "asterisk/file.h"
-#include "asterisk/logger.h"
-#include "asterisk/channel.h"
-#include "asterisk/pbx.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
-#include "asterisk/app.h"
-#include "asterisk/options.h"
-#include "asterisk/config.h"
+#include <asterisk/file.h>
+#include <asterisk/logger.h>
+#include <asterisk/channel.h>
+#include <asterisk/pbx.h>
+#include <asterisk/module.h>
+#include <asterisk/lock.h>
+#include <asterisk/app.h>
+#include <asterisk/options.h>
+#include <asterisk/config.h>
+#include <asterisk/astobj2.h>
 
 #include "sandbox/include/sandbox.h"
 #include "sandbox/include/configuration.h"

Modified: team/dhubbard/named_processors/res/sandbox/include/task.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/task.h?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/task.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/task.h Wed Mar 12 01:40:40 2008
@@ -27,14 +27,13 @@
 #define __task_h__
 
 struct a_task {
-	unsigned long _id;
-	char _source[256];
+	int (* execute)(struct a_task* t);
+	void *_datap;
+	size_t _datapsize;
 	void* _p_producer;
 	void* _p_consumer;
-	
-	int (* execute)(struct a_task* t);
-	
 	time_t _timestamp;
+	char _source[256];
 	AST_LIST_ENTRY(a_task) list;
 };
 
@@ -42,8 +41,7 @@
  * get an available a_task structure from the pool of available
  * structures.  If no structures are available, allocate one
  */
-struct a_task* get_available_task(unsigned long id, char* src);
-struct a_task* get_available_task_with_handler(unsigned long id, char* src, int (*f)(struct a_task* t));
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, size_t size, char* src);
 int task_pool_size(void);
 /*
  * release a_task structure back to the pool
@@ -53,7 +51,6 @@
 int stop_taskpool(void);
 int destroy_task_pool(void);
 
-int invalid_task_execute(struct a_task* t);
 int noop_task_execute(struct a_task* t);
 
 #endif

Modified: team/dhubbard/named_processors/res/sandbox/include/taskproducer.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/taskproducer.h?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/taskproducer.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/taskproducer.h Wed Mar 12 01:40:40 2008
@@ -23,13 +23,12 @@
 #define __taskproducer_h__
 
 struct taskproducer {
-	void* _owner;
 	struct taskprocessor_singleton_info* _taskprocessor;
 	unsigned long _tasks_produced;
 
 	int (* queue_task)(struct taskproducer* producer, struct a_task* task);
 }; 
 
-struct taskproducer* construct_taskproducer(void* owner, struct taskprocessor_singleton_info* processor);
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor);
 #endif
 

Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Wed Mar 12 01:40:40 2008
@@ -42,7 +42,7 @@
 	}
 	memset(s, 0, sizeof(struct simobject));
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
-	s->_producer = construct_taskproducer(s, t);
+	s->_producer = construct_taskproducer(t);
 	s->_consumer = construct_taskconsumer(s, t);
 	s->start = start_simobject;
 	s->stop = stop_simobject;
@@ -70,7 +70,7 @@
 	memset(s, 0, sizeof(struct simobject));
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
 	snprintf(tpg, sizeof(tpg), "%s-generator", s->_name);
-	s->_producer = construct_taskproducer(s, t);
+	s->_producer = construct_taskproducer(t);
 	s->_consumer = construct_taskconsumer(s, t);
 	if (!exists_taskprocessor_singleton(tpg)) {
 		if (create_taskprocessor_singleton(tpg, _simproducer_thread_function) < 0) {
@@ -124,20 +124,19 @@
 	struct a_task* t = NULL;
 	
 	if (s && s->_producer) {
-		t = get_available_task(SIM_PRODUCER_EVENT, "start_simobject");
+		t = get_available_task(simobject_taskhandler, 0, 0, "start_simobject");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			return -1;
 		}
 		t->_p_producer = s->_producer;
 		t->_p_consumer = s->_consumer;
-		t->execute = simobject_taskhandler;
 		if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!  Releasing task back to task pool\n", s->_name);
 			release_task(t);
 			return -1;
 		}
-		ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_producer->_taskprocessor->_name);
+		ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
 	}
 	return 0;
 }
@@ -147,19 +146,18 @@
 	struct a_task* t = NULL;
 	
 	if (s && s->_producer) {
-		t = get_available_task(SIM_PRODUCER_SLEEP, "start_simproducer");
+		t = get_available_task(simobject_taskhandler, 0, 0, "start_simproducer");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			return -1;
 		}
 		t->_p_producer = s->_producer;
 		t->_p_consumer = s->_consumer;
-		t->execute = simobject_taskhandler;
 		if (s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) < 0)) {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!\n", s->_name);
 			return -1;
 		}
-		ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_producer->_taskprocessor->_name);
+		ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
 	}
 	return 0;
 }
@@ -176,26 +174,11 @@
 
 static int simobject_taskhandler(struct a_task* e)
 {
-	int rc = 0;
 	if (!e) {
 		ast_log(LOG_ERROR, "Huh? No event!!\n");
 		return -1;
 	}
-	switch (e->_id) {
-	case SIM_PRODUCER_EVENT:
-		ast_log(LOG_DEBUG, "[SIM_PRODUCER_EVENT] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
-			, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
-		break;
-	case SIM_PRODUCER_SLEEP:
-		ast_log(LOG_DEBUG, "[SIM_PRODUCER_SLEEP] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
-			, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
-		break;
-	default:
-		ast_log(LOG_WARNING, "[UNHANDLED TASK: 0x%ld] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
-			, (unsigned long)e->_id, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
-		rc = -1;
-	}
-	return rc;
+	return 0;
 }
 
 void* _simproducer_thread_function(void* data)
@@ -203,9 +186,6 @@
 	struct taskprocessor_singleton_info* i = NULL;
 	struct simobject* s = NULL;
 	struct a_task* t = NULL;
-	//unsigned long max_qsize = 0;
-	//unsigned long task_count = 0;
-	//long qsize = 0;
 	int killflag = 0;
 	int freq = 1;
 	struct timespec ts = { 0, };
@@ -235,7 +215,6 @@
 	freq = i->_poll_freq;
 	s = (struct simobject*)i->_private;
 	ast_mutex_unlock(&i->_taskprocessor_lock);
-	//ast_log(LOG_NOTICE, "producer: 0x%ld, consumer: 0x%ld, taskgenerator: 0x%ld\n", (unsigned long)s->_producer, (unsigned long)s->_consumer, (unsigned long)s->_taskgenerator);
 
 	while (!killflag) {	
 
@@ -245,25 +224,23 @@
 		 *
 		 * start by sleeping for the poll frequency, then produce a task 
 		 */
-		//usleep((rand()%10)+freq);
-		usleep(1000);
+		usleep((rand()%10)+freq);
 		if (!s) {
 			ast_log(LOG_ERROR, "Huh? we have no private simobject pointer?  This is bad!\n");
 			break;
 		}
-		t = get_available_task(SIM_PRODUCER_SLEEP, "_simproducer_thread_function");
+		t = get_available_task(simobject_taskhandler, 0, 0, "_simproducer_thread_function");
 		if (!t) {
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			break;
 		}
 		t->_p_producer = s->_producer;
 		t->_p_consumer = s->_consumer;
-		t->execute = simobject_taskhandler;
 		if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) == 0)) {
-			ast_log(LOG_DEBUG, "simobject \'%s\' queued task: 0x%ld to processor \'%s\'\n", s->_name, (unsigned long)t->_id, s->_consumer->_taskprocessor->_name);
+			ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_consumer->_taskprocessor->_name);
 		} else {
-			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task 0x%ld (producer: 0x%ld, generator: 0x%ld)!  Releasing task to task pool\n"
-				, s->_name, (unsigned long)t->_id, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
+			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)!  Releasing task to task pool\n"
+				, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
 			release_task(t);
 			break;
 		}

Modified: team/dhubbard/named_processors/res/sandbox/task.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/task.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/task.c (original)
+++ team/dhubbard/named_processors/res/sandbox/task.c Wed Mar 12 01:40:40 2008
@@ -22,17 +22,9 @@
 AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
 static int _global_kill_taskpool = 0;
 
-static struct a_task* __get_available_task(unsigned long id, char* src, int (*f)(struct a_task* t));
-
-int invalid_task_execute(struct a_task* t)
-{
-	ast_log(LOG_ERROR, "Unable to execute task \'%d\' for \'%s\' - No execute() handler available\n", (t)?(int)t->_id:-1, (t)?t->_source:"<task-structure-missing>");
-	return -1;
-}
-
 int noop_task_execute(struct a_task* t)
 {
-	ast_log(LOG_DEBUG, "noop (task \'%d\') for \'%s\'\n", (t)?(int)t->_id:-1, (t)?t->_source:"<task-structure-missing>");
+	ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
 	return 0;
 }
 
@@ -40,28 +32,22 @@
  * get an available a_task structure from the pool of available
  * structures.  If no structures are available, allocate one
  */
-struct a_task* get_available_task(unsigned long id, char* src)
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, size_t size, char* src)
 {
-	return __get_available_task(id, src, 0);
-}
+	struct a_task *t;
+	void *tdata;
+	int lock_failures = 0;
 
-
-struct a_task* get_available_task_with_handler(unsigned long id, char* src, int (*f)(struct a_task* t))
-{
-	return __get_available_task(id, src, f);
-}
-
-static struct a_task* __get_available_task(unsigned long id, char* src, int (*f)(struct a_task* t)) 
-{
-	struct a_task* t = NULL;
-	int lock_failures = 0;
-	
+	tdata = ast_calloc(1, size);
+	if (tdata) {
+		memcpy(tdata, datap, size);
+	}
 	AST_LIST_LOCK(&_task_pool);
 	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
 		lock_failures++;
 		AST_LIST_UNLOCK(&_task_pool);
 		usleep(1);
-		if (lock_failures > 10000) {
+		if (lock_failures > 10) {
 			ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
 			return NULL;
 		}
@@ -75,17 +61,17 @@
 		return NULL;
 	}
 	if (AST_LIST_EMPTY(&_task_pool)) {
-		t = (void*)malloc(sizeof(struct a_task));
-		if (t) {
-			memset(t, 0, sizeof(struct a_task));
-			if (f == 0) {
-				t->execute = &invalid_task_execute;
-			} else {
-				t->execute = f;
-			}
-			t->_id = id;
-			strncpy(t->_source, src, sizeof(t->_source));
+		t = ast_calloc(1,sizeof(*t));
+		if (!t) {
+			ast_log(LOG_ERROR, "Poop.  ast_calloc failed to get a task for \'%s\'\n", src);
+			ast_mutex_unlock(&_global_kill_taskpool_lock);
+			AST_LIST_UNLOCK(&_task_pool);
+			return NULL;
 		}
+		t->execute = task_exe;
+		t->_datap = tdata;
+		t->_datapsize = size;
+		strncpy(t->_source, src, sizeof(t->_source));
 		ast_mutex_unlock(&_global_kill_taskpool_lock);
 		AST_LIST_UNLOCK(&_task_pool);
 		return t;
@@ -95,12 +81,9 @@
 	ast_mutex_unlock(&_global_kill_taskpool_lock);
 	AST_LIST_UNLOCK(&_task_pool);
 	if (t) {
-		if (f == 0) {
-			t->execute = &invalid_task_execute;
-		} else {
-			t->execute = f;
-		}
-		t->_id = id;
+		t->execute = task_exe;
+		t->_datap = tdata;
+		t->_datapsize = size;
 		strncpy(t->_source, src, sizeof(t->_source));
 	}
 	return t;
@@ -132,6 +115,9 @@
 int release_task(struct a_task* t)
 {
 	int lock_failures = 0;
+	if (t->_datap) {
+		free(t->_datap);
+	}
 	memset(t, 0, sizeof(struct a_task));
 	AST_LIST_LOCK(&_task_pool);
 	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {

Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Wed Mar 12 01:40:40 2008
@@ -28,9 +28,8 @@
 /* the default handler function is a failure */
 static int invalid_handle_task(struct taskconsumer* consumer, struct a_task* task)
 {
-	ast_log(LOG_ERROR, "handle_task() not overidden.  consumer: 0x%ld, task: 0x%ld, tasksource: \'%s\'\n"
+	ast_log(LOG_ERROR, "handle_task() not overidden.  consumer: 0x%ld, tasksource: \'%s\'\n"
 		, (unsigned long)consumer
-		, (task)?(unsigned long)task->_id:0
 		, (task)?task->_source:"unknown");
 	return -1;
 }

Modified: team/dhubbard/named_processors/res/sandbox/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskprocessor.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskprocessor.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskprocessor.c Wed Mar 12 01:40:40 2008
@@ -26,13 +26,17 @@
 #include "include/taskdefinitions.h"
 
 #define DEFAULT_POLL_FREQUENCY 1 
+
+AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
+static int _global_killflag = 0;
+
 pthread_attr_t _attribute[100];
 AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
 static int _taskprocessor_singletons_list_size = 0;
 
 static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
 static int remove_taskprocessor_singleton(const char* name);
-static int taskprocessor_taskhandler(struct a_task* e);
+static int taskprocessor_ping(struct a_task* e);
 
 /*********
  *  CLI
@@ -81,12 +85,11 @@
 		return CLI_SUCCESS;
 	}
 
-	t = get_available_task(TASKPROCESSOR_CLI_PING, "cli_taskprocessor_ping");
+	t = get_available_task(taskprocessor_ping, 0, 0, "cli_taskprocessor_ping");
 	if (!t) {
 		ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
 		return RESULT_SUCCESS;	
 	}
-	t->execute = taskprocessor_taskhandler;
 	if (push_task_to_taskprocessor_singleton(p, t) < 0) {
 		ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
 		release_task(t);
@@ -134,24 +137,89 @@
 	return RESULT_SUCCESS;	
 }
 
-static int taskprocessor_taskhandler(struct a_task* e)
-{
-	int rc = 0;
-
+static void* default_taskprocessor_thread_function(void* data)
+{
+	struct taskprocessor_singleton_info* i = NULL;
+	struct a_task* t = NULL;
+	unsigned long size = 0;
+	unsigned long task_count = 0;
+	long qsize = 0;
+	int killflag = 0;
+	struct timespec ts = { 0, };
+	struct timeval tv;
+
+	i = (struct taskprocessor_singleton_info*)data;
+	if (!i) {
+		ast_log(LOG_ERROR, "cannot start thread_function loop without a taskprocessor_singleton_info structure.\n");
+		return NULL;
+	}
+
+	while ((!killflag) && (i->_poll_thread_run)) {
+		if ((size = size_of_taskprocessor_singleton_queue(i)) > 0) {
+			/* stuff is in the queue */
+			t = pop_task_from_taskprocessor_singleton(i);
+			if (!t) {
+				ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+				continue;
+			}
+			if (!t->execute) {
+				ast_log(LOG_ERROR, "task is missing its execute callback.\n");
+				release_task(t);
+				continue;
+			}
+			if (t->execute(t) < 0) {
+				ast_log(LOG_ERROR, "execute() returned failure.\n");
+			}
+			ast_mutex_lock(&i->_taskprocessor_lock);
+			if (i->_stats) {
+				i->_stats->_tasks_processed_count++;
+				if (size > i->_stats->_max_qsize) {
+					i->_stats->_max_qsize = size;
+				}
+				task_count = i->_stats->_tasks_processed_count;
+			}
+			ast_mutex_unlock(&i->_taskprocessor_lock);
+			release_task(t);
+			t = NULL;
+		
+			ast_mutex_lock(&_global_killflag_lock);
+			killflag = _global_killflag;
+			ast_mutex_unlock(&_global_killflag_lock);
+			if (--size) continue;
+		}
+		if (!killflag) {
+			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
+			ts.tv_sec = tv.tv_sec;
+			ts.tv_nsec = tv.tv_usec * 1000;
+			ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout)\n", i->_name);
+			ast_mutex_lock(&i->_taskprocessor_lock);
+			ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+			ast_mutex_unlock(&i->_taskprocessor_lock);
+		}	
+		ast_mutex_lock(&_global_killflag_lock);
+		killflag = _global_killflag;
+		ast_mutex_unlock(&_global_killflag_lock);
+	}
+	while (size_of_taskprocessor_singleton_queue(i)) {
+		/* stuff is in the queue */
+		t = pop_task_from_taskprocessor_singleton(i);
+		if (t) {
+			release_task(t);
+			t = NULL;
+		}
+	}
+	i->_is_purged = TRUE;
+	return NULL;
+}
+
+static int taskprocessor_ping(struct a_task* e)
+{
 	if (!e) {
 		ast_log(LOG_ERROR, "Huh? No event!!\n");
 		return -1;
 	}
-	switch (e->_id) {
-	case TASKPROCESSOR_CLI_PING:
-		ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
-		break;
-	default:
-		ast_log(LOG_WARNING, "[UNHANDLED TASK: 0x%ld] source: \'%s\', producer: 0x%ld, consumer: 0x%ld\n"
-			, (unsigned long)e->_id, e->_source, (unsigned long)e->_p_producer, (unsigned long)e->_p_consumer);
-		rc = -1;
-	}
-	return rc;
+	ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
+	return 0;
 }
 
 int register_sandbox_taskprocessor_clis(void)
@@ -229,7 +297,7 @@
 		pthread_kill(p->_poll_thread, SIGURG);
 	} else {
 		/* create it */
-		if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func, p) < 0) {
+		if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
 			ast_mutex_unlock(&p->_taskprocessor_lock);
 			ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
 			return -1;

Modified: team/dhubbard/named_processors/res/sandbox/taskproducer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskproducer.c?view=diff&rev=107992&r1=107991&r2=107992
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskproducer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskproducer.c Wed Mar 12 01:40:40 2008
@@ -28,29 +28,22 @@
 static int default_queue_task(struct taskproducer* producer, struct a_task* task)
 {
 	if ((!producer) || (!task)) {
-		ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n"
-			, (unsigned long)producer
-			, (unsigned long)task
-			);
+		ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
 		return -1;
 	}
 	if (push_task_to_taskprocessor_singleton(producer->_taskprocessor, task) < 0) {
-		ast_log(LOG_ERROR, "we failed to push task: 0x%ld to taskprocessor \'%s\'.\n"
-			, (unsigned long)task->_id
-			, (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>"
-			);
+		ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>");
 		return -1;
 	}
 	producer->_tasks_produced++;
 	ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
 	ast_cond_signal(&producer->_taskprocessor->_poll_cond);
 	ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
-	ast_log(LOG_DEBUG, "task: 0x%ld pushed to \'%s\' and signal sent\n", task->_id, producer->_taskprocessor->_name);
 	return 0;
 }
 
 /* create and initialize a task producer */
-struct taskproducer* construct_taskproducer(void* owner, struct taskprocessor_singleton_info* processor)
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
 {
 	struct taskproducer* p = NULL;
 	p = malloc(sizeof(struct taskproducer));
@@ -59,11 +52,8 @@
 		return NULL;
 	}
 	memset(p, 0, sizeof(struct taskproducer));
-	p->_owner = owner;
 	p->_taskprocessor = processor;
 	p->queue_task = default_queue_task;
-	ast_log(LOG_DEBUG, "created default_taskproducer for owner: 0x%ld using taskprocessor \'%s\'\n"
-		, (unsigned long)owner, (processor)?processor->_name:"<null>");
 	return p;
 }
 




More information about the svn-commits mailing list