[asterisk-commits] dhubbard: branch dhubbard/named_processors r109906 - in /team/dhubbard/named_...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Wed Mar 19 01:11:58 CDT 2008


Author: dhubbard
Date: Wed Mar 19 01:11:51 2008
New Revision: 109906

URL: http://svn.digium.com/view/asterisk?view=rev&rev=109906
Log:
The taskproducer has come back to life, but now the taskpool is gone for good because it was discussed with Russell and Mark M. that obtaining access to the task pool could become a bottleneck.  

Modified:
    team/dhubbard/named_processors/apps/app_queue.c
    team/dhubbard/named_processors/include/asterisk/taskprocessor.h
    team/dhubbard/named_processors/main/pbx.c
    team/dhubbard/named_processors/main/taskprocessor.c
    team/dhubbard/named_processors/res/sandbox/simobject.c

Modified: team/dhubbard/named_processors/apps/app_queue.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/apps/app_queue.c?view=diff&rev=109906&r1=109905&r2=109906
==============================================================================
--- team/dhubbard/named_processors/apps/app_queue.c (original)
+++ team/dhubbard/named_processors/apps/app_queue.c Wed Mar 19 01:11:51 2008
@@ -132,7 +132,8 @@
 	{ QUEUE_STRATEGY_WRANDOM, "wrandom"},
 };
 
-struct taskprocessor_singleton_info *taskprocessor;
+struct taskproducer *tpsp;
+struct taskprocessor_singleton_info *tpsi;
 
 #define DEFAULT_RETRY		5
 #define DEFAULT_TIMEOUT		15
@@ -791,7 +792,8 @@
 	}
 	sc->state = state;
 	strcpy(sc->dev, device);
-	if (AST_TASKPROCESSOR_QUEUE(taskprocessor, handle_statechange, sc) < 0) {
+	t = ast_task_alloc(handle_statechange, sc, "app_queue-device_state_cb");
+	if (tpsp->queue_task(tpsp, t) < 0) {
 		ast_log(LOG_WARNING, "queue_task failed!!\n");
 		ast_task_free(t);
 	}
@@ -6204,7 +6206,8 @@
 	clear_and_free_interfaces();
 
 	ao2_ref(queues, -1);
-	ao2_ref(taskprocessor, -1);
+	ao2_ref(tpsp, -1);
+	ao2_ref(tpsi, -1);
 	return res;
 }
 
@@ -6250,7 +6253,8 @@
 	res |= ast_custom_function_register(&queuewaitingcount_function);
 	res |= ast_custom_function_register(&queuememberpenalty_function);
 
-	taskprocessor = ast_taskprocessor_alloc("app_queue", 0);
+	tpsi = ast_taskprocessor_alloc("app_queue", 0);
+	tpsp = ast_taskproducer_alloc(tpsi);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL, AST_EVENT_IE_END)))
 		res = -1;

Modified: team/dhubbard/named_processors/include/asterisk/taskprocessor.h
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/include/asterisk/taskprocessor.h?view=diff&rev=109906&r1=109905&r2=109906
==============================================================================
--- team/dhubbard/named_processors/include/asterisk/taskprocessor.h (original)
+++ team/dhubbard/named_processors/include/asterisk/taskprocessor.h Wed Mar 19 01:11:51 2008
@@ -54,28 +54,25 @@
 	struct taskprocessor_singleton_stats* _stats;
 	void *_private;
 	long _queue_size;
-	int (* queue_task)(struct taskprocessor_singleton_info *tp, struct a_task* task);
 	AST_LIST_HEAD(_queue, a_task) _queue;
 	AST_LIST_ENTRY(taskprocessor_singleton_info) list;
 };
 
-#define AST_TASKPROCESSOR_QUEUE(taskprocessor, function, data)	\
-	taskprocessor->queue_task(taskprocessor, ast_task_alloc(function, data, "blah"))
+struct taskproducer {
+	struct taskprocessor_singleton_info* _taskprocessor;
+	unsigned long _tasks_produced;
+
+	int (* queue_task)(struct taskproducer* producer, struct a_task* task);
+}; 
 
 unsigned char _evtq_poll_thread_run;
 
 struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src);
 int ast_task_free(struct a_task* task);
-int ast_task_poolsize(void);
 
 int ast_taskprocessor_push(struct taskprocessor_singleton_info* tp, struct a_task* t);
 struct a_task* ast_taskprocessor_pop(struct taskprocessor_singleton_info* tp);
 int ast_taskprocessor_size(struct taskprocessor_singleton_info* tp);
-
-int stop_taskpool(void);
-int destroy_task_pool(void);
-
-int noop_task_execute(struct a_task *t);
 
 struct taskprocessor_singleton_info *ast_taskprocessor_alloc(const char *name, void *(*func)(void*));
 int size_of_taskprocessor_singleton_list(void);
@@ -83,5 +80,6 @@
 int register_taskprocessor_clis(void);
 int unregister_taskprocessor_clis(void);
 
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor);
 #endif
 

Modified: team/dhubbard/named_processors/main/pbx.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/pbx.c?view=diff&rev=109906&r1=109905&r2=109906
==============================================================================
--- team/dhubbard/named_processors/main/pbx.c (original)
+++ team/dhubbard/named_processors/main/pbx.c Wed Mar 19 01:11:51 2008
@@ -125,6 +125,7 @@
 struct ast_app;
 
 struct taskprocessor_singleton_info *taskprocessor;
+struct taskproducer *taskproducer;
 
 AST_THREADSTORAGE(switch_data);
 
@@ -7758,7 +7759,8 @@
 	if (!(sc = ast_calloc(1, sizeof(*sc) + strlen(device) + 1)))
 		return;
 	strcpy(sc->dev, device);
-	if (AST_TASKPROCESSOR_QUEUE(taskprocessor, handle_statechange, sc) < 0) {
+	t = ast_task_alloc(handle_statechange, sc, "pbx-device_state_cb");
+	if (taskproducer->queue_task(taskproducer, t) < 0) {
 		ast_log(LOG_WARNING, "queue_task failed!\n");
 		ast_task_free(t);
 	}
@@ -7788,6 +7790,7 @@
 	ast_manager_register2("ShowDialPlan", EVENT_FLAG_CONFIG | EVENT_FLAG_REPORTING, manager_show_dialplan, "List dialplan", mandescr_show_dialplan);
 
 	taskprocessor = ast_taskprocessor_alloc("pbx", 0);
+	taskproducer = ast_taskproducer_alloc(taskprocessor);
 
 	if (!(device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE, device_state_cb, NULL,
 			AST_EVENT_IE_END))) {

Modified: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=diff&rev=109906&r1=109905&r2=109906
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (original)
+++ team/dhubbard/named_processors/main/taskprocessor.c Wed Mar 19 01:11:51 2008
@@ -39,15 +39,9 @@
 AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
 static int _taskprocessor_singletons_list_size = 0;
 
-static long _task_poolsize = 0;
-AST_LIST_HEAD_STATIC(_task_pool, a_task);
-
-static int _global_kill_taskpool = 0;
-AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
-
-static int default_queue_task(struct taskprocessor_singleton_info *tp, struct a_task* task);
 static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
 static void destroy_taskprocessor_singleton(void *tps);
+static void destroy_taskproducer(void *tp);
 static int taskprocessor_ping(struct a_task* e);
 
 static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -58,137 +52,25 @@
 	AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
 };
 
-int noop_task_execute(struct a_task* t)
-{
-	ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
-	return 0;
-}
-
-/*
- * get an available a_task structure from the pool of available
- * structures.  If no structures are available, allocate one
- */
 struct a_task* ast_task_alloc(int (*task_exe)(struct a_task *task), void* datap, char* src)
 {
 	struct a_task *t;
-	int lock_failures = 0;
-
-	AST_LIST_LOCK(&_task_pool);
-	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
-		lock_failures++;
-		AST_LIST_UNLOCK(&_task_pool);
-		usleep(1);
-		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "cannot lock task pool for \'%s\'.\n", src);
-			return NULL;
-		}
-		AST_LIST_LOCK(&_task_pool);
-	}
-	/* If the kill_taskpool flag is TRUE, then we should not give any more pooled tasks to requestors */
-	if (_global_kill_taskpool) {
-		ast_mutex_unlock(&_global_kill_taskpool_lock);
-		AST_LIST_UNLOCK(&_task_pool);
-		ast_log(LOG_DEBUG, "task pool is no longer available to requestors.\n");
-		return NULL;
-	}
-	if (AST_LIST_EMPTY(&_task_pool)) {
-		t = ast_calloc(1,sizeof(*t));
-		if (!t) {
-			ast_log(LOG_ERROR, "Poop.  ast_calloc failed to get a task for \'%s\'\n", src);
-			ast_mutex_unlock(&_global_kill_taskpool_lock);
-			AST_LIST_UNLOCK(&_task_pool);
-			return NULL;
-		}
-		t->execute = task_exe;
-		t->_datap = datap;
-		strncpy(t->_source, src, sizeof(t->_source));
-		ast_mutex_unlock(&_global_kill_taskpool_lock);
-		AST_LIST_UNLOCK(&_task_pool);
-		return t;
-	}
-	t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
-	_task_poolsize -= 1;
-	ast_mutex_unlock(&_global_kill_taskpool_lock);
-	AST_LIST_UNLOCK(&_task_pool);
-	if (t) {
-		t->execute = task_exe;
-		t->_datap = datap;
-		strncpy(t->_source, src, sizeof(t->_source));
-	}
+	t = ast_calloc(1,sizeof(*t));
+	if (!t) {
+		ast_log(LOG_ERROR, "Poop.  Failed to allocate memory for \'%s\'\n", src);
+		return NULL;
+	}
+	t->execute = task_exe;
+	t->_datap = datap;
+	strncpy(t->_source, src, sizeof(t->_source));
 	return t;
 }
 	
-/*
- * release a_task structure back to the pool
- */
 int ast_task_free(struct a_task* t)
 {
-	int lock_failures = 0;
 	if (t->_datap) {
 		ast_free(t->_datap);
 	}
-	memset(t, 0, sizeof(struct a_task));
-	AST_LIST_LOCK(&_task_pool);
-	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
-		lock_failures++;
-		AST_LIST_UNLOCK(&_task_pool);
-		usleep(1);
-		if (lock_failures > 10) {
-			ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
-			return 0;
-		}
-		AST_LIST_LOCK(&_task_pool);
-	}
-	if (_global_kill_taskpool != 0) {
-		ast_log(LOG_NOTICE, "task pool is no longer available for task releasing\n");
-		ast_mutex_unlock(&_global_kill_taskpool_lock);
-		AST_LIST_UNLOCK(&_task_pool);
-		return 0;
-	}
-	AST_LIST_INSERT_TAIL(&_task_pool, t, list);
-	_task_poolsize += 1;
-	ast_mutex_unlock(&_global_kill_taskpool_lock);
-	AST_LIST_UNLOCK(&_task_pool);
-	return 0;
-}
-
-/*
- * stop the task pool
- */
-int stop_taskpool()
-{
-	ast_mutex_lock(&_global_kill_taskpool_lock);
-	_global_kill_taskpool = 1;
-	ast_mutex_unlock(&_global_kill_taskpool_lock);
-	return 0;
-}
-
-int ast_task_poolsize()
-{
-	int size = 0;
-	AST_LIST_LOCK(&_task_pool);
-	size = _task_poolsize;
-	AST_LIST_UNLOCK(&_task_pool);
-	return size;
-}
-
-/*
- * destroy a_task pool
- */
-int destroy_task_pool(void)
-{
-	struct a_task* t = NULL;
-
-	AST_LIST_LOCK(&_task_pool);
-	while (!AST_LIST_EMPTY(&_task_pool)) {
-		t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
-		if (t) {
-			ast_free(t);
-			t = NULL;
-		}
-		_task_poolsize -= 1;
-	}
-	AST_LIST_UNLOCK(&_task_pool);
 	return 0;
 }
 
@@ -248,10 +130,6 @@
 	}
 
 	t = ast_task_alloc(taskprocessor_ping, 0, "cli_taskprocessor_ping");
-	if (!t) {
-		ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
-		return RESULT_SUCCESS;	
-	}
 	if (ast_taskprocessor_push(p, t) < 0) {
 		ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
 		ast_task_free(t);
@@ -265,7 +143,6 @@
 	unsigned long qsize;
 	unsigned long maxqsize;
 	unsigned long processed;
-	unsigned long taskpool;
 	struct taskprocessor_singleton_info* p;
 
 	switch (cmd) {
@@ -282,6 +159,7 @@
 	if (a->argc != e->args)
 		return CLI_SHOWUSAGE;
 
+	ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
 	AST_LIST_LOCK(&_taskprocessor_singletons);
 	AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
 		ast_mutex_lock(&p->_taskprocessor_lock);
@@ -290,12 +168,10 @@
 		maxqsize = p->_stats->_max_qsize;
 		processed = p->_stats->_tasks_processed_count;
 		ast_mutex_unlock(&p->_taskprocessor_lock);
-		ast_cli(a->fd, "\n%20s: processed: %10ld, qsize: %ld (max: %ld)", name, processed, qsize, maxqsize);
-	}
-	AST_LIST_UNLOCK(&_taskprocessor_singletons);
-	taskpool = ast_task_poolsize();
-	ast_cli(a->fd, "\n%20s: %ld\n\n", "task pool size", taskpool);
-
+		ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	ast_cli(a->fd, "\n\n");
 	return RESULT_SUCCESS;	
 }
 
@@ -412,7 +288,6 @@
 		ao2_ref(tps, -1);
 		return NULL;
 	}
-	tps->queue_task = default_queue_task;
 	return tps;
 }
 	
@@ -622,20 +497,43 @@
 }
 
 /* provide a default implementation of a queue() command */
-static int default_queue_task(struct taskprocessor_singleton_info *tp, struct a_task* task)
-{
-	if ((!tp) || (!task)) {
-		ast_log(LOG_ERROR, "a taskprocessor: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)tp, (unsigned long)task);
+static int default_queue_task(struct taskproducer* producer, struct a_task* task)
+{
+	if ((!producer) || (!task)) {
+		ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
 		return -1;
 	}
-	if (ast_taskprocessor_push(tp, task) < 0) {
-		ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", tp->_name);
+	if (ast_taskprocessor_push(producer->_taskprocessor, task) < 0) {
+		ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>");
 		return -1;
 	}
-	ast_mutex_lock(&tp->_taskprocessor_lock);
-	ast_cond_signal(&tp->_poll_cond);
-	ast_mutex_unlock(&tp->_taskprocessor_lock);
-	return 0;
-}
-
-
+	producer->_tasks_produced++;
+	ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
+	ast_cond_signal(&producer->_taskprocessor->_poll_cond);
+	ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
+	return 0;
+}
+
+/* create and initialize a task producer */
+struct taskproducer* ast_taskproducer_alloc(struct taskprocessor_singleton_info* processor)
+{
+	struct taskproducer* p;
+	p = ao2_alloc(sizeof(*p), destroy_taskproducer);
+	if (!p) {
+		ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
+		return NULL;
+	}
+	p->_taskprocessor = processor;
+	ao2_ref(processor, 1);
+	p->queue_task = default_queue_task;
+	return p;
+}
+
+static void destroy_taskproducer(void *tp)
+{
+	struct taskproducer *p;
+	p = (struct taskproducer *)tp;
+	ast_log(LOG_DEBUG, "destroying taskproducer\n");
+	ao2_ref(p->_taskprocessor, -1);
+}
+

Modified: team/dhubbard/named_processors/res/sandbox/simobject.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/res/sandbox/simobject.c?view=diff&rev=109906&r1=109905&r2=109906
==============================================================================
--- team/dhubbard/named_processors/res/sandbox/simobject.c (original)
+++ team/dhubbard/named_processors/res/sandbox/simobject.c Wed Mar 19 01:11:51 2008
@@ -23,6 +23,7 @@
 #include "include/simobject.h"
 
 static int start_simobject(struct simobject* s);
+static int start_simproducer(struct simobject* s);
 static int simobject_taskhandler(struct a_task* t);
 static void* _simproducer_thread_function(void* data);
 static void destroy_simobject(void *sim);
@@ -43,6 +44,7 @@
 		return NULL;
 	}
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
+	s->_producer = ast_taskproducer_alloc(t);
 	s->_consumer = ast_taskconsumer_alloc(t);
 	s->start = start_simobject;
 	return s;
@@ -66,6 +68,7 @@
 	}
 	snprintf(s->_name, sizeof(s->_name), "%s", name);
 	snprintf(tpg, sizeof(tpg), "%s-generator", s->_name);
+	s->_producer = ast_taskproducer_alloc(t);
 	s->_consumer = ast_taskconsumer_alloc(t);
 	s->_taskgenerator = ast_taskprocessor_alloc(tpg, _simproducer_thread_function);
 	if (!s->_taskgenerator) {
@@ -74,6 +77,7 @@
 		return NULL;
 	}
 	s->_taskgenerator->_private = s;
+	s->start = start_simproducer;
 	return s;
 
 }
@@ -111,6 +115,9 @@
 		pthread_join(s->_taskgenerator->_poll_thread, NULL);
 		s->_taskgenerator->_poll_thread = AST_PTHREADT_NULL;
 	}
+	if (s->_producer) {
+		ao2_ref(s->_producer, -1);
+	}
 	if (s->_consumer) {
 		ao2_ref(s->_consumer, -1);
 	}
@@ -121,8 +128,8 @@
 
 int start_simobject(struct simobject* s)
 {
-#if 0
 	struct a_task* t = NULL;
+	
 	if (s && s->_producer) {
 		t = ast_task_alloc(simobject_taskhandler, 0, "start_simobject");
 		if (!t) {
@@ -137,14 +144,13 @@
 			return -1;
 		}
 	}
-#endif
 	return 0;
 }
 
-//int start_simproducer(struct simobject* s)
-//{
-#if 0	
+int start_simproducer(struct simobject* s)
+{
 	struct a_task* t = NULL;
+	
 	if (s && s->_producer) {
 		t = ast_task_alloc(simobject_taskhandler, 0, "start_simproducer");
 		if (!t) {
@@ -158,9 +164,8 @@
 			return -1;
 		}
 	}
-#endif
-//	return 0;
-//}
+	return 0;
+}
 
 static int simobject_taskhandler(struct a_task* e)
 {
@@ -222,15 +227,14 @@
 			ast_log(LOG_ERROR, "ERROR: task pool failed to supply a task\n");
 			break;
 		}
+		t->_p_producer = s->_producer;
 		t->_p_consumer = s->_consumer;
-#if 0
 		if (s->_producer && s->_producer->queue_task && (s->_producer->queue_task(s->_producer, t))) {
 			ast_log(LOG_WARNING, "simobject \'%s\' failed to queue task (producer: 0x%ld, generator: 0x%ld)!  Releasing task to task pool\n"
 				, s->_name, (unsigned long)s->_producer, (unsigned long)s->_taskgenerator);
 			ast_task_free(t);
 			break;
 		}
-#endif
 		ast_mutex_lock(&i->_taskprocessor_lock);
 		ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
 		ast_mutex_unlock(&i->_taskprocessor_lock);




More information about the asterisk-commits mailing list