[asterisk-commits] dhubbard: branch dhubbard/named_processors r109011 - in /team/dhubbard/named_...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Sun Mar 16 22:51:09 CDT 2008


Author: dhubbard
Date: Sun Mar 16 22:51:08 2008
New Revision: 109011

URL: http://svn.digium.com/view/asterisk?view=rev&rev=109011
Log:
A lot of changes with interfaces, astobj2, code cleanup, and general stuff...  res_testobserver, which is an unimportant sample client crashes when unloaded maybe due to simobject stuff which doesn't really matter either.  I also want to send an eeep out to file in the land of canadians

Modified:
    team/dhubbard/named_processors/apps/app_queue.c
    team/dhubbard/named_processors/include/asterisk/taskprocessor.h
    team/dhubbard/named_processors/main/taskprocessor.c
    team/dhubbard/named_processors/res/res_testobserver.c
    team/dhubbard/named_processors/res/sandbox/include/simobject.h
    team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h
    team/dhubbard/named_processors/res/sandbox/simobject.c
    team/dhubbard/named_processors/res/sandbox/taskconsumer.c

Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Sun Mar 16 22:51:08 2008
@@ -6196,6 +6196,7 @@
 	clear_and_free_interfaces();
 
 	ao2_ref(queues, -1);
+	ao2_ref(tpsp, -1);
 	ao2_ref(tpsi, -1);
 	return res;
 }
@@ -6242,11 +6243,8 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	if (!exists_taskprocessor_singleton("app_queue")) {
-		create_taskprocessor_singleton("app_queue", 0);
-	}
-	tpsi = get_taskprocessor_singleton("app_queue");
-	tpsp = construct_taskproducer(tpsi);
+	tpsi = ast_taskprocessor_alloc("app_queue", 0);
+	tpsp = ast_taskproducer_alloc(tpsi);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
 		res = -1;

Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Sun Mar 16 22:51:08 2008
@@ -78,16 +78,14 @@
 int stop_taskpool(void);
 int destroy_task_pool(void);
 
-int noop_task_execute(struct a_task* t);
+int noop_task_execute(struct a_task *t);
 
-int create_taskprocessor_singleton(const char* name, void* (*func)(void*));
+struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*));
 int size_of_taskprocessor_singleton_list(void);
-struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name);
-int exists_taskprocessor_singleton(char *name);
 
 int register_taskprocessor_clis(void);
 int unregister_taskprocessor_clis(void);
 
-struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor);
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor);
 #endif
 

Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Sun Mar 16 22:51:08 2008
@@ -47,7 +47,7 @@
 
 static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
 static void destroy_taskprocessor_singleton(void *tps);
-static int remove_taskprocessor_singleton(const char* name);
+static void destroy_taskproducer(void *tp);
 static int taskprocessor_ping(struct a_task* e);
 
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -373,7 +373,7 @@
 }
 
 
-static struct taskprocessor_singleton_info* construct_default_taskprocessor(int poll_freq)
+static struct taskprocessor_singleton_info* tps_default_constructor(int poll_freq)
 {
 	struct taskprocessor_singleton_info* tps;
 	tps = ao2_alloc(sizeof(*tps), destroy_taskprocessor_singleton);
@@ -391,16 +391,16 @@
 	tps->_stats = ast_calloc(1, sizeof(*tps->_stats));
 	if (!tps->_stats) {
 		ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_stats structure.\n");
-		ast_free(tps);
+		ao2_ref(tps, -1);
 		return NULL;
 	}
 	return tps;
 }
 	
-int create_taskprocessor_singleton(const char* name, void* (*func)(void*))
+struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*))
 {
 	int index;
-	struct taskprocessor_singleton_info* p = NULL;
+	struct taskprocessor_singleton_info *p;
 		
 	ast_mutex_lock(&_global_clireg_lock);
 	if (!_global_clireg) {
@@ -408,31 +408,47 @@
 		_global_clireg = 1;	
 	}
 	ast_mutex_unlock(&_global_clireg_lock);
-
-	p = construct_default_taskprocessor(1);
-	if (!p) {
+	
+	if (!name) {
+		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
+		return NULL;
+	}
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+		if (!strcasecmp(p->_name, name)) {
+			AST_LIST_UNLOCK(&_taskprocessor_singletons);
+			ao2_ref(p, 1);
+			ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' already exists!.\n", p->_name);
+			return p;
+		}
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	if ((p = tps_default_constructor(DEFAULT_POLL_FREQUENCY)) == NULL) {
 		ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
-		return -1;
+		return NULL;
 	}
 	snprintf(p->_name, sizeof(p->_name), "%s", name);
 	p->_poll_thread_run = 1;
 	if (add_taskprocessor_singleton(p) < 0) {
 		ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
-		return -1;
+		ao2_ref(p, -1);
+		return NULL;
 	}
 	index = size_of_taskprocessor_singleton_list();
-	ast_log(LOG_DEBUG, "found taskprocessor %s at index %d\n", name, index);
+	ast_log(LOG_DEBUG, "creating taskprocessor \'%s\' at index %d\n", name, index);
 	pthread_attr_init(&_attribute[index]);
 	/* stay stopped if we are supposed to be stopped */
 	if (p->_poll_thread == AST_PTHREADT_STOP) {
 		ast_log(LOG_DEBUG, "poll thread == AST_PTHREADT_STOP.\n");
-		return 0;
+		ao2_ref(p, -1);
+		return NULL;
 	}
 	ast_mutex_lock(&p->_taskprocessor_lock);
 	if (p->_poll_thread == pthread_self()) {
 		ast_mutex_unlock(&p->_taskprocessor_lock);
 		ast_log(LOG_DEBUG, "cannot kill myself.\n");
-		return -1;
+		ao2_ref(p, -1);
+		return NULL;
 	}
 	if (p->_poll_thread != AST_PTHREADT_NULL) {
 		/* wake it up */
@@ -442,52 +458,47 @@
 		if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
 			ast_mutex_unlock(&p->_taskprocessor_lock);
 			ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
-			return -1;
+			ao2_ref(p, -1);
+			return NULL;
 		}
 	}
 	pthread_attr_destroy(&_attribute[index]);
 	ast_mutex_unlock(&p->_taskprocessor_lock);
-	return 0;
+	return p;
 }
 
 static void destroy_taskprocessor_singleton(void *tps)
 {
+	struct taskprocessor_singleton_info *n;
 	struct taskprocessor_singleton_info *t = (struct taskprocessor_singleton_info *)tps;
+
+	ast_log(LOG_DEBUG, "destroying taskprocessor \'%s\'\n", t->_name);
 	if (!t) {
 		ast_log(LOG_ERROR, "can't destruct a NULL taskprocessor_singleton.\n");
 		return;
 	}
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+		if (n == t) {
+			AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
+			_taskprocessor_singletons_list_size -= 1;
+			//AST_LIST_UNLOCK(&_taskprocessor_singletons);
+			ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' removed.\n", t->_name);
+			break;
+		}
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
 	t->_poll_thread_run = 0;
 	ast_mutex_lock(&t->_taskprocessor_lock);
 	ast_cond_signal(&t->_poll_cond);
 	ast_mutex_unlock(&t->_taskprocessor_lock);
 	pthread_join(t->_poll_thread, NULL);
 	t->_poll_thread = AST_PTHREADT_NULL;
-
-	if (remove_taskprocessor_singleton(t->_name) < 0) {
-		ast_log(LOG_WARNING, "cannot remove taskprocessor_singleton \'%s\'.\n", t->_name);
-	}
 	if (t->_stats) {
 		ast_free(t->_stats);
 		t->_stats = NULL;
 	}
 	return;
-}
-
-int exists_taskprocessor_singleton(char *name)
-{
-	int found=0;
-	struct taskprocessor_singleton_info* n = NULL;
-
-	AST_LIST_LOCK(&_taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
-		if (!strcasecmp(n->_name, name)) {
-			found=1;
-			break;
-		}
-	}
-	AST_LIST_UNLOCK(&_taskprocessor_singletons);
-	return found;
 }
 
 static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
@@ -508,46 +519,6 @@
 	return 0;
 }
 
-struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name)
-{
-	struct taskprocessor_singleton_info* n;
-
-	if (!name) {
-		ast_log(LOG_WARNING, "requesting a nameless taskprocessor!!!\n");
-		return NULL;
-	}
-	AST_LIST_LOCK(&_taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
-		if (!strcasecmp(n->_name, name)) {
-			AST_LIST_UNLOCK(&_taskprocessor_singletons);
-			ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' located.\n", n->_name);
-			return n;
-		}
-	}
-	AST_LIST_UNLOCK(&_taskprocessor_singletons);
-	ast_log(LOG_WARNING, "could not find the taskprocessor named \'%s\'\n", name);
-	return NULL; 
-}
-
-static int remove_taskprocessor_singleton(const char* name)
-{
-	struct taskprocessor_singleton_info* n = NULL;
-
-	AST_LIST_LOCK(&_taskprocessor_singletons);
-	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
-		if (!strcasecmp(n->_name, name)) {
-			AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
-			_taskprocessor_singletons_list_size -= 1;
-			AST_LIST_UNLOCK(&_taskprocessor_singletons);
-			ast_log(LOG_NOTICE, "taskprocessor_singleton \'%s\' removed.\n", name);
-			return 0;
-		}
-	}
-	AST_LIST_UNLOCK(&_taskprocessor_singletons);
-	ast_log(LOG_WARNING, "did not find a taskprocessor_singleton \'%s\'\n", name);
-	return -1; 
-}
-
 int size_of_taskprocessor_singleton_list(void)
 {
 	int size;
@@ -557,30 +528,30 @@
 	return size;
 }
 
-int ast_taskprocessor_push(struct taskprocessor_singleton_info* tp, struct a_task* t)
+int ast_taskprocessor_push(struct taskprocessor_singleton_info* tps, struct a_task* t)
 {
 	int lock_failures = 0;
 
-	if (!tp || !t) {
-		ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tp, (long)t);
+	if (!tps || !t) {
+		ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tps, (long)t);
 		return -1;
 	}
-	AST_LIST_LOCK(&tp->_queue);
-	while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+	AST_LIST_LOCK(&tps->_queue);
+	while (ast_mutex_trylock(&tps->_taskprocessor_lock)) {
 		lock_failures++;
-		AST_LIST_UNLOCK(&tp->_queue);
+		AST_LIST_UNLOCK(&tps->_queue);
 		usleep(1);
 		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+			ast_log(LOG_ERROR, "cannot lock taskprocessor \'%s\'.\n", tps->_name);
 			return -1;
 		}
-		AST_LIST_LOCK(&tp->_queue);
-	}
-	AST_LIST_INSERT_TAIL(&tp->_queue, t, list);
-	tp->_queue_size+=1;
-	ast_cond_signal(&tp->_poll_cond);
-	ast_mutex_unlock(&tp->_taskprocessor_lock);
-	AST_LIST_UNLOCK(&tp->_queue);
+		AST_LIST_LOCK(&tps->_queue);
+	}
+	AST_LIST_INSERT_TAIL(&tps->_queue, t, list);
+	tps->_queue_size+=1;
+	ast_cond_signal(&tps->_poll_cond);
+	ast_mutex_unlock(&tps->_taskprocessor_lock);
+	AST_LIST_UNLOCK(&tps->_queue);
 	return 0;
 }
 
@@ -650,15 +621,25 @@
 }
 
 /* create and initialize a task producer */
-struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor)
 {
 	struct taskproducer* p;
-	p = ast_calloc(1, sizeof(*p));
+	p = ao2_alloc(sizeof(*p), destroy_taskproducer);
 	if (!p) {
 		ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
 		return NULL;
 	}
 	p->_taskprocessor = processor;
+	ao2_ref(processor, 1);
 	p->queue_task = default_queue_task;
 	return p;
 }
+
+static void destroy_taskproducer(void *tp)
+{
+	struct taskproducer *p;
+	p = (struct taskproducer *)tp;
+	ast_log(LOG_DEBUG, "destroying taskproducer\n");
+	ao2_ref(p->_taskprocessor, -1);
+}
+

Modified: team/dhubbard/named_processors/res/res_testobserver.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/res_testobserver.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/res_testobserver.c (original)
+++ team/dhubbard/named_processors/res/res_testobserver.c Sun Mar 16 22:51:08 2008
@@ -64,6 +64,7 @@
 #define MAX_TEST_PROCESSORS 4
 #define MAX_TEST_PRODUCERS  20
 struct simobject* simprod[MAX_TEST_PRODUCERS];
+struct taskprocessor_singleton_info *simprocessors[MAX_TEST_PROCESSORS];
 
 AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
 static int _global_killflag = 0;
@@ -191,7 +192,6 @@
 			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
 			ts.tv_sec = tv.tv_sec;
 			ts.tv_nsec = tv.tv_usec * 1000;
-			ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout), loops: %ld\n", i->_name, loop_count);
 			ast_mutex_lock(&i->_taskprocessor_lock);
 			ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
 			ast_mutex_unlock(&i->_taskprocessor_lock);
@@ -216,21 +216,23 @@
 static int unload_module(void)
 {
 	int res, count;
-	char tbuf[128];
-	struct simobject* s = NULL;
+	struct simobject *s;
 	
 	ast_log(LOG_NOTICE, "-- testobserver -- unloading.\n");
+	ast_log(LOG_NOTICE, "STOPPING %d PRODUCERS\n", MAX_TEST_PRODUCERS);
 	for (count=0; count < MAX_TEST_PRODUCERS; count++) {
-		s = (struct simobject *)simprod[count];
-		if ( s ) { 
-			s->stop(s);
-		}
-	}
+		s = simprod[count];
+		if (s && s->_producer) {
+			ao2_ref(s->_producer, -1);
+		}
+		if (s && s->_consumer) {
+			ao2_ref(s->_consumer, -1);
+		}
+		ao2_ref(s, -1);
+	}
+	ast_log(LOG_NOTICE, "STOPPING %d PROCESSORS\n", MAX_TEST_PROCESSORS);
 	for (count=0; count < MAX_TEST_PROCESSORS; count++) {
-		snprintf(tbuf, sizeof(tbuf), "sillyprocessor-%d", count);
-		if (exists_taskprocessor_singleton(tbuf)) {
-			ao2_ref(get_taskprocessor_singleton(tbuf), -1);
-		}	
+		ao2_ref(simprocessors[count], -1);
 	}
 	ast_log(LOG_NOTICE, "-- testobserver -- unload complete.\n");
 	usleep(100);
@@ -243,33 +245,30 @@
 {
 	int count;
 	char tbuf[128], tproc[128];
-	struct simobject* s = NULL;
-
+	struct simobject* s;
+	struct taskprocessor_singleton_info *t;
+
+	ast_log(LOG_NOTICE, "CREATING %d PROCESSORS\n", MAX_TEST_PROCESSORS);
 	for (count=0; count < MAX_TEST_PROCESSORS; count++) {
 		snprintf(tbuf, sizeof(tbuf), "sillyprocessor-%d", count);
 		ast_log(LOG_DEBUG, "%s\n", tbuf);
-		if (!exists_taskprocessor_singleton(tbuf)) {
-			if (create_taskprocessor_singleton(tbuf, _evtq_poll_thread_function) < 0) {
-				ast_log(LOG_ERROR, "can't create \'%s\' taskprocessor singleton\n", tbuf);
-				return AST_MODULE_LOAD_DECLINE;
-			}
-		}
-	}
+		t = ast_taskprocessor_alloc(tbuf, 0);
+		simprocessors[count] = t;
+	}
+	usleep(100);
+	ast_log(LOG_NOTICE, "CREATING %d PRODUCERS\n", MAX_TEST_PRODUCERS);
 	for (count=0; count < MAX_TEST_PRODUCERS; count++) {
 		snprintf(tbuf, sizeof(tbuf), "producer-%d", count);
 		snprintf(tproc, sizeof(tproc), "sillyprocessor-%d", count%MAX_TEST_PROCESSORS);
-		ast_log(LOG_DEBUG, "%s will use %s\n", tbuf, tproc);
-		simprod[count] = construct_simproducer(tbuf, tproc);
-	}
-
-	ast_log(LOG_NOTICE, "started %d processors, task pool size: %d\n"
-		, size_of_taskprocessor_singleton_list(), ast_task_poolsize());
+		simprod[count] = (struct simobject *)ast_simproducer_alloc(tbuf, tproc);
+	}
+	ast_log(LOG_NOTICE, "started %d processors, task pool size: %d\n", size_of_taskprocessor_singleton_list(), ast_task_poolsize());
 	
 	if (ast_register_application(app, app_exec, synopsis, descrip))
 		return AST_MODULE_LOAD_DECLINE;
 	
 	for (count=0; count < MAX_TEST_PRODUCERS; count++) {
-		s = simprod[count];
+		s = (struct simobject *)simprod[count];
 		if (s && s->start)
 			s->start(s);
 	}

Modified: team/dhubbard/named_processors/res/sandbox/include/simobject.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/simobject.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/simobject.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/simobject.h Sun Mar 16 22:51:08 2008
@@ -31,11 +31,10 @@
 	struct taskprocessor_singleton_info *_taskgenerator;
 
 	int (*start)(struct simobject *sim);
-	int (*stop)(struct simobject *sim);
 }; 
 
-struct simobject* construct_simobject(const char *name, const char *named_taskprocessor); 
-struct simobject* construct_simproducer(const char *name, const char *named_taskprocessor); 
-struct simobject* construct_simconsumer(const char *name, const char *named_taskprocessor); 
+struct simobject* ast_simobject_alloc(const char *name, const char *named_taskprocessor); 
+struct simobject* ast_simproducer_alloc(const char *name, const char *named_taskprocessor); 
+struct simobject* ast_simconsumer_alloc(const char *name, const char *named_taskprocessor); 
 
 #endif

Modified: team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h (original)
+++ team/dhubbard/named_processors/res/sandbox/include/taskconsumer.h Sun Mar 16 22:51:08 2008
@@ -22,12 +22,10 @@
 #define __taskconsumer_h__
 
 struct taskconsumer {
-	void* _owner;
 	struct taskprocessor_singleton_info* _taskprocessor;
-	
 	int (* handle_task)(struct taskconsumer* consumer, struct a_task* task);
 }; 
 
-struct taskconsumer* construct_taskconsumer(void* owner, struct taskprocessor_singleton_info* processor);
+struct taskconsumer* ast_taskconsumer_alloc(struct taskprocessor_singleton_info* processor);
 #endif
 

Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Sun Mar 16 22:51:08 2008
@@ -16,6 +16,7 @@
  * at the top of the source tree.
  */
 #include <asterisk.h>
+#include <asterisk/astobj2.h>
 #include <asterisk/taskprocessor.h>
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
@@ -23,99 +24,105 @@
 
 static int start_simobject(struct simobject* s);
 static int start_simproducer(struct simobject* s);
-static int stop_simobject(struct simobject* s);
 static int simobject_taskhandler(struct a_task* t);
 static void* _simproducer_thread_function(void* data);
-
-struct simobject* construct_simobject(const char* name, const char* named_taskprocessor)
-{
-	struct simobject* s = NULL;
-	struct taskprocessor_singleton_info* t = NULL;
-
-	t = get_taskprocessor_singleton(named_taskprocessor);
+static void destroy_simobject(void *sim);
+
+struct simobject* ast_simobject_alloc(const char* name, const char* named_taskprocessor)
+{
+	struct simobject *s;
+	struct taskprocessor_singleton_info *t;
+
+	t = ast_taskprocessor_alloc(named_taskprocessor, 0);
 	if (!t) {
-		ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for simobject \'%s\' construction.\n"
-			, named_taskprocessor, name);
-		return NULL;
-	}
-	s = ast_calloc(1, sizeof(*s));
+		ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for simobject \'%s\' construction.\n", named_taskprocessor, name);
+		return NULL;
+	}
+	s = ao2_alloc(sizeof(*s), destroy_simobject);
 	if (!s) {
-		ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+		ast_log(LOG_ERROR, "Failed to allocate simobject \'%s\'\n", name);
 		return NULL;
 	}
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
-	s->_producer = construct_taskproducer(t);
-	s->_consumer = construct_taskconsumer(s, t);
+	s->_producer = ast_taskproducer_alloc(t);
+	s->_consumer = ast_taskconsumer_alloc(t);
 	s->start = start_simobject;
-	s->stop = stop_simobject;
-	ast_log(LOG_DEBUG, "created simobject \'%s\' at 0x%ld using taskprocessor \'%s\'.\n", name, (unsigned long)s, named_taskprocessor);
 	return s;
 }
 
-struct simobject* construct_simproducer(const char* name, const char* named_taskprocessor)
-{
-	char tpg[256]; /*generator name, there is probably a definition for this size */
-	struct simobject* s = NULL;
-	struct taskprocessor_singleton_info* t = NULL;
-
-	t = get_taskprocessor_singleton(named_taskprocessor);
+struct simobject* ast_simproducer_alloc(const char* name, const char* named_taskprocessor)
+{
+	char tpg[256];
+	struct simobject *s;
+	struct taskprocessor_singleton_info *t;
+
+	t = ast_taskprocessor_alloc(named_taskprocessor, 0);
 	if (!t) {
-		ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for sim-producer \'%s\' construction.\n"
-			, named_taskprocessor, name);
-		return NULL;
-	}
-	s = ast_calloc(1, sizeof(*s));
+		ast_log(LOG_ERROR, "could not allocate taskprocessor \'%s\' for sim-producer \'%s\' construction.\n", named_taskprocessor, name);
+		return NULL;
+	}
+	s = ao2_alloc(sizeof(*s), destroy_simobject);
 	if (!s) {
-		ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+		ast_log(LOG_ERROR, "Failed to allocate simproducer \'%s\'\n", name);
 		return NULL;
 	}
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
 	snprintf(tpg, sizeof(tpg), "%s-generator", s->_name);
-	s->_producer = construct_taskproducer(t);
-	s->_consumer = construct_taskconsumer(s, t);
-	if (!exists_taskprocessor_singleton(tpg)) {
-		if (create_taskprocessor_singleton(tpg, _simproducer_thread_function) < 0) {
-			ast_log(LOG_ERROR, "can't create \'%s\' taskprocessor singleton\n", tpg);
-			ast_free(s);
-			return NULL;
-		}
-	}
-	s->_taskgenerator = get_taskprocessor_singleton(tpg);
+	s->_producer = ast_taskproducer_alloc(t);
+	s->_consumer = ast_taskconsumer_alloc(t);
+	s->_taskgenerator = ast_taskprocessor_alloc(tpg, _simproducer_thread_function);
 	if (!s->_taskgenerator) {
 		ast_log(LOG_ERROR, "Huh? we don't have a task generator called \'%s\' ??\n", tpg);
-		ast_free(s);
-		return NULL;
-	}
-	ast_log(LOG_DEBUG, "simobject at 0x%ld (taskgenerator: 0x%ld) setting private to 0x%ld\n", (unsigned long)&s, (unsigned long)s->_taskgenerator, (unsigned long)&s);
+		ao2_ref(s, -1);
+		return NULL;
+	}
 	s->_taskgenerator->_private = s;
 	s->start = start_simproducer;
-	s->stop = stop_simobject;
 	return s;
 
 }
 
-struct simobject* construct_simconsumer(const char* name, const char* named_taskprocessor)
-{
-	struct simobject* s = NULL;
-	struct taskprocessor_singleton_info* t = NULL;
-
-	t = get_taskprocessor_singleton(named_taskprocessor);
+struct simobject* ast_simconsumer_alloc(const char* name, const char* named_taskprocessor)
+{
+	struct simobject* s;
+	struct taskprocessor_singleton_info* t;
+
+	t = ast_taskprocessor_alloc(named_taskprocessor, 0);
 	if (!t) {
-		ast_log(LOG_ERROR, "could not locate taskprocessor named \'%s\' for sim-consumer \'%s\' construction.\n"
-			, named_taskprocessor, name);
-		return NULL;
-	}
-	s = ast_calloc(1, sizeof(*s));
+		ast_log(LOG_ERROR, "could not allocate taskprocessor \'%s\' for sim-producer \'%s\' construction.\n", named_taskprocessor, name);
+		return NULL;
+	}
+	s = ao2_alloc(sizeof(*s), destroy_simobject);
 	if (!s) {
-		ast_log(LOG_ERROR, "cannot allocate memory for a simobject structure.\n");
+		ast_log(LOG_ERROR, "Failed to allocate simconsumer \'%s\'\n", name);
 		return NULL;
 	}
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
-	s->_consumer = construct_taskconsumer(s, t);
+	s->_consumer = ast_taskconsumer_alloc(t);
 	s->start = start_simobject;
-	s->stop = stop_simobject;
-	ast_log(LOG_DEBUG, "created simobject \'%s\' with task production capabilities, using taskprocessor \'%s\'.\n", name, named_taskprocessor);
 	return s;
+}
+
+static void destroy_simobject(void *sim)
+{
+	struct simobject *s = (struct simobject *)sim;
+	if (s && s->_taskgenerator) {
+		ast_log(LOG_DEBUG, "simobject \'%s\' stopping\n", s->_name);
+		s->_taskgenerator->_poll_thread_run = 0;
+		ast_mutex_lock(&s->_taskgenerator->_taskprocessor_lock);
+		ast_cond_signal(&s->_taskgenerator->_poll_cond);
+		ast_mutex_unlock(&s->_taskgenerator->_taskprocessor_lock);
+		pthread_join(s->_taskgenerator->_poll_thread, NULL);
+		s->_taskgenerator->_poll_thread = AST_PTHREADT_NULL;
+	}
+	if (s->_producer) {
+		ao2_ref(s->_producer, -1);
+	}
+	if (s->_consumer) {
+		ao2_ref(s->_consumer, -1);
+	}
+	ast_log(LOG_DEBUG, "simobject destroyed!\n");
+	return;
 }
 
 
@@ -136,7 +143,6 @@
 			ast_task_free(t);
 			return -1;
 		}
-		ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
 	}
 	return 0;
 }
@@ -157,20 +163,9 @@
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task!\n", s->_name);
 			return -1;
 		}
-		ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_producer->_taskprocessor->_name);
 	}
 	return 0;
 }
-
-int stop_simobject(struct simobject* s)
-{
-	if (s && s->_taskgenerator) {
-		ast_log(LOG_DEBUG, "simobject \'%s\' stopping\n", s->_name);
-		s->_taskgenerator->_poll_thread_run = 0;
-	}
-	return 0;
-}
-
 
 static int simobject_taskhandler(struct a_task* e)
 {
@@ -209,7 +204,6 @@
 
 		ast_log(LOG_DEBUG, "waiting to start \'%s\'\n", i->_name);
 	} while (!i->_poll_thread_run);
-		
 	ast_mutex_lock(&i->_taskprocessor_lock);
 	killflag = !i->_poll_thread_run;
 	freq = i->_poll_freq;
@@ -224,7 +218,6 @@
 		 *
 		 * start by sleeping for the poll frequency, then produce a task 
 		 */
-		usleep((rand()%10)+freq);
 		if (!s) {
 			ast_log(LOG_ERROR, "Huh? we have no private simobject pointer?  This is bad!\n");
 			break;
@@ -236,19 +229,22 @@
 		}
 		t->_p_producer = s->_producer;
 		t->_p_consumer = s->_consumer;
-		if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t) == 0)) {
-			ast_log(LOG_DEBUG, "simobject \'%s\' queued task to processor \'%s\'\n", s->_name, s->_consumer->_taskprocessor->_name);
-		} else {
+		if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t))) {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)!  Releasing task to task pool\n"
 				, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
 			ast_task_free(t);
 			break;
 		}
 		ast_mutex_lock(&i->_taskprocessor_lock);
+		ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+		ast_mutex_unlock(&i->_taskprocessor_lock);
+		usleep(1);
+		ast_mutex_lock(&i->_taskprocessor_lock);
 		killflag = !i->_poll_thread_run;
 		ast_mutex_unlock(&i->_taskprocessor_lock);
 	}
 	i->_is_purged = 1;
+	ao2_ref(i, -1);
 	return NULL;
 }
 

Modified: team/dhubbard/named_processors/res/sandbox/taskconsumer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/taskconsumer.c?view=diff&rev=109011&r1=109010&r2=109011
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/taskconsumer.c (original)
+++ team/dhubbard/named_processors/res/sandbox/taskconsumer.c Sun Mar 16 22:51:08 2008
@@ -22,35 +22,40 @@
  *  its event.
  */
 #include <asterisk.h>
+#include <asterisk/astobj2.h>
 #include <asterisk/taskprocessor.h>
 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #include "include/taskconsumer.h"
+static void destroy_taskconsumer(void *tc);
 
 /* the default handler function is a failure */
 static int invalid_handle_task(struct taskconsumer* consumer, struct a_task* task)
 {
-	ast_log(LOG_ERROR, "handle_task() not overidden.  consumer: 0x%ld, tasksource: \'%s\'\n"
-		, (unsigned long)consumer
-		, (task)?task->_source:"unknown");
+	ast_log(LOG_ERROR, "handle_task() not overidden.  consumer: 0x%ld, tasksource: \'%s\'\n", (unsigned long)consumer, (task)?task->_source:"unknown");
 	return -1;
 }
 
 /* create and initialize the default consumer object */
-struct taskconsumer* construct_taskconsumer(void* owner, struct taskprocessor_singleton_info* processor)
+struct taskconsumer* ast_taskconsumer_alloc(struct taskprocessor_singleton_info* processor)
 {
 	struct taskconsumer* c = NULL;
-	c = ast_calloc(1, sizeof(*c));
+	c = ao2_alloc(sizeof(*c), destroy_taskconsumer);
 	if (!c) {
-		ast_log(LOG_ERROR, "cannot allocate memory for a taskconsumer structure.\n");
+		ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
 		return NULL;
 	}
-	c->_owner = owner;
 	c->_taskprocessor = processor;
 	c->handle_task = invalid_handle_task;
-	ast_log(LOG_DEBUG, "created default_taskconsumer for owner: 0x%ld using taskprocessor \'%s\'\n"
-		, (unsigned long)owner, (processor)?processor->_name:"<null>");
+	ast_log(LOG_DEBUG, "created taskconsumer using taskprocessor \'%s\'\n", (processor)?processor->_name:"<null>");
 	return c;
 }
 
+static void destroy_taskconsumer(void *tc)
+{
+	struct taskconsumer *c;
+	ast_log(LOG_DEBUG, "destroying taskconsumer\n");
+	c = (struct taskconsumer *)tc;
+	ao2_ref(c->_taskprocessor, -1);
+}
 




More information about the asterisk-commits mailing list