[asterisk-commits] dhubbard: branch dhubbard/named_processors r108637 - /team/dhubbard/named_pro...

SVN commits to the Asterisk project asterisk-commits at lists.digium.com
Thu Mar 13 17:48:56 CDT 2008


Author: dhubbard
Date: Thu Mar 13 17:48:56 2008
New Revision: 108637

URL: http://svn.digium.com/view/asterisk?view=rev&rev=108637
Log:
moved taskprocessing modules to core

Added:
    team/dhubbard/named_processors/main/task.c   (with props)
    team/dhubbard/named_processors/main/taskprocessor.c   (with props)
    team/dhubbard/named_processors/main/taskproducer.c   (with props)
Modified:
    team/dhubbard/named_processors/main/Makefile

Modified: team/dhubbard/named_processors/main/Makefile
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/Makefile?view=diff&rev=108637&r1=108636&r2=108637
==============================================================================
--- team/dhubbard/named_processors/main/Makefile (original)
+++ team/dhubbard/named_processors/main/Makefile Thu Mar 13 17:48:56 2008
@@ -30,7 +30,7 @@
 	cryptostub.o sha1.o http.o fixedjitterbuf.o abstract_jb.o \
 	strcompat.o threadstorage.o dial.o event.o adsistub.o audiohook.o \
 	astobj2.o hashtab.o global_datastores.o $(RESAMPLE_OBJS) version.o \
-	features.o
+	features.o task.o taskprocessor.o taskproducer.o
 
 # we need to link in the objects statically, not as a library, because
 # otherwise modules will not have them available if none of the static

Added: team/dhubbard/named_processors/main/task.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/task.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/task.c (added)
+++ team/dhubbard/named_processors/main/task.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,160 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard 
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+#include <asterisk.h>
+#include <asterisk/task.h>
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+AST_LIST_HEAD_STATIC(_task_pool, a_task);
+static long _task_pool_size = 0;
+AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
+static int _global_kill_taskpool = 0;
+
+int noop_task_execute(struct a_task* t)
+{
+	ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
+	return 0;
+}
+
+/*
+ * get an available a_task structure from the pool of available
+ * structures.  If no structures are available, allocate one
+ */
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, char* src)
+{
+	struct a_task *t;
+	int lock_failures = 0;
+
+	AST_LIST_LOCK(&_task_pool);
+	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&_task_pool);
+		usleep(1);
+		if (lock_failures > 10) {
+			ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+			return NULL;
+		}
+		AST_LIST_LOCK(&_task_pool);
+	}
+	/* If the kill_taskpool flag is TRUE, then we should not give any more pooled tasks to requestors */
+	if (_global_kill_taskpool != 0) {
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		ast_log(LOG_NOTICE, "task pool is no longer available to requestors.\n");
+		return NULL;
+	}
+	if (AST_LIST_EMPTY(&_task_pool)) {
+		t = ast_calloc(1,sizeof(*t));
+		if (!t) {
+			ast_log(LOG_ERROR, "Poop.  ast_calloc failed to get a task for \'%s\'\n", src);
+			ast_mutex_unlock(&_global_kill_taskpool_lock);
+			AST_LIST_UNLOCK(&_task_pool);
+			return NULL;
+		}
+		t->execute = task_exe;
+		t->_datap = datap;
+		strncpy(t->_source, src, sizeof(t->_source));
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		return t;
+	}
+	t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+	_task_pool_size -= 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	AST_LIST_UNLOCK(&_task_pool);
+	if (t) {
+		t->execute = task_exe;
+		t->_datap = datap;
+		strncpy(t->_source, src, sizeof(t->_source));
+	}
+	return t;
+}
+	
+/*
+ * stop the task pool
+ */
+int stop_taskpool()
+{
+	ast_mutex_lock(&_global_kill_taskpool_lock);
+	_global_kill_taskpool = 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	return 0;
+}
+
+int task_pool_size()
+{
+	int size = 0;
+	AST_LIST_LOCK(&_task_pool);
+	size = _task_pool_size;
+	AST_LIST_UNLOCK(&_task_pool);
+	return size;
+}
+
+/*
+ * release a_task structure back to the pool
+ */
+int release_task(struct a_task* t)
+{
+	int lock_failures = 0;
+	if (t->_datap) {
+		ast_free(t->_datap);
+	}
+	memset(t, 0, sizeof(struct a_task));
+	AST_LIST_LOCK(&_task_pool);
+	while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&_task_pool);
+		usleep(1);
+		if (lock_failures > 10000) {
+			ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+			return 0;
+		}
+		AST_LIST_LOCK(&_task_pool);
+	}
+	if (_global_kill_taskpool != 0) {
+		ast_log(LOG_NOTICE, "task pool is no longer available for task releasing\n");
+		ast_mutex_unlock(&_global_kill_taskpool_lock);
+		AST_LIST_UNLOCK(&_task_pool);
+		return 0;
+	}
+	AST_LIST_INSERT_TAIL(&_task_pool, t, list);
+	_task_pool_size += 1;
+	ast_mutex_unlock(&_global_kill_taskpool_lock);
+	AST_LIST_UNLOCK(&_task_pool);
+	return 0;
+}
+
+/*
+ * destroy a_task pool
+ */
+int destroy_task_pool(void)
+{
+	struct a_task* t = NULL;
+
+	AST_LIST_LOCK(&_task_pool);
+	while (!AST_LIST_EMPTY(&_task_pool)) {
+		t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+		if (t) {
+			ast_free(t);
+			t = NULL;
+		}
+		_task_pool_size -= 1;
+	}
+	AST_LIST_UNLOCK(&_task_pool);
+	return 0;
+}
+

Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (added)
+++ team/dhubbard/named_processors/main/taskprocessor.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,526 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard 
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+/*
+ * Maintain a container of taskprocessor threads that are uniquely named
+ */
+#include <asterisk.h>
+#include <asterisk/taskprocessor.h>
+#include <asterisk/task.h>
+#include <asterisk/cli.h>
+#include <signal.h>
+#include <sys/time.h>
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#define DEFAULT_POLL_FREQUENCY 1 
+
+AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
+static int _global_killflag = 0;
+
+AST_MUTEX_DEFINE_STATIC(_global_clireg_lock);
+static int _global_clireg = 0;
+
+pthread_attr_t _attribute[100];
+AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
+static int _taskprocessor_singletons_list_size = 0;
+
+static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
+static int remove_taskprocessor_singleton(const char* name);
+static int taskprocessor_ping(struct a_task* e);
+
+/*********
+ *  CLI
+ *********/
+static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+
+static struct ast_cli_entry taskprocessor_clis[] = {
+	AST_CLI_DEFINE(cli_taskprocessor_ping, "Ping a named task processors"),
+	AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
+};
+
+static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	int found = 0;
+	struct a_task* t = NULL;
+	struct taskprocessor_singleton_info* p = NULL;
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "taskprocessor ping";
+		e->usage = 
+			"Usage: taskprocessor ping <taskprocessor>\n"
+			"	Displays the time required for a processor to deliver a task\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;	
+	}
+
+	if (a->argc != 3)
+		return CLI_SHOWUSAGE;
+
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+		ast_mutex_lock(&p->_taskprocessor_lock);
+		if (!strcasecmp(p->_name, a->argv[2])) {
+			found = 1;
+		}
+		ast_mutex_unlock(&p->_taskprocessor_lock);
+		if (found) break;
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+
+	if ((!found) || (!p)) {
+		ast_cli(a->fd, "\n%s failed: %s not found\n", e->command, a->argv[2]);
+		return CLI_SUCCESS;
+	}
+
+	t = get_available_task(taskprocessor_ping, 0, "cli_taskprocessor_ping");
+	if (!t) {
+		ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
+		return RESULT_SUCCESS;	
+	}
+	if (push_task_to_taskprocessor_singleton(p, t) < 0) {
+		ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
+		release_task(t);
+	}
+	return RESULT_SUCCESS;	
+}
+
+static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+	char name[256];
+	unsigned long qsize;
+	unsigned long maxqsize;
+	unsigned long processed;
+	unsigned long taskpool;
+	struct taskprocessor_singleton_info* p;
+
+	switch (cmd) {
+	case CLI_INIT:
+		e->command = "taskprocessor show stats";
+		e->usage = 
+			"Usage: taskprocessor show stats\n"
+			"	Shows a list of instantiated task processors and their statistics\n";
+		return NULL;
+	case CLI_GENERATE:
+		return NULL;	
+	}
+
+	if (a->argc != e->args)
+		return CLI_SHOWUSAGE;
+
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+		ast_mutex_lock(&p->_taskprocessor_lock);
+		snprintf(name, sizeof(name), "%s", p->_name);
+		qsize = p->_queue_size;
+		maxqsize = p->_stats->_max_qsize;
+		processed = p->_stats->_tasks_processed_count;
+		ast_mutex_unlock(&p->_taskprocessor_lock);
+		ast_cli(a->fd, "\n%20s: processed: %10ld, qsize: %ld (max: %ld)", name, processed, qsize, maxqsize);
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	taskpool = task_pool_size();
+	ast_cli(a->fd, "\n%20s: %ld\n\n", "task pool size", taskpool);
+
+	return RESULT_SUCCESS;	
+}
+
+static void* default_taskprocessor_thread_function(void* data)
+{
+	struct taskprocessor_singleton_info* i = NULL;
+	struct a_task* t = NULL;
+	unsigned long size = 0;
+	unsigned long task_count = 0;
+	long qsize = 0;
+	int killflag = 0;
+	struct timespec ts = { 0, };
+	struct timeval tv;
+
+	i = (struct taskprocessor_singleton_info*)data;
+	if (!i) {
+		ast_log(LOG_ERROR, "cannot start thread_function loop without a taskprocessor_singleton_info structure.\n");
+		return NULL;
+	}
+
+	while ((!killflag) && (i->_poll_thread_run)) {
+		if ((size = size_of_taskprocessor_singleton_queue(i)) > 0) {
+			/* stuff is in the queue */
+			t = pop_task_from_taskprocessor_singleton(i);
+			if (!t) {
+				ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+				continue;
+			}
+			if (!t->execute) {
+				ast_log(LOG_ERROR, "task is missing its execute callback.\n");
+				release_task(t);
+				continue;
+			}
+			if (t->execute(t) < 0) {
+				ast_log(LOG_ERROR, "execute() returned failure.\n");
+			}
+			ast_mutex_lock(&i->_taskprocessor_lock);
+			if (i->_stats) {
+				i->_stats->_tasks_processed_count++;
+				if (size > i->_stats->_max_qsize) {
+					i->_stats->_max_qsize = size;
+				}
+				task_count = i->_stats->_tasks_processed_count;
+			}
+			ast_mutex_unlock(&i->_taskprocessor_lock);
+			release_task(t);
+			t = NULL;
+		
+			ast_mutex_lock(&_global_killflag_lock);
+			killflag = _global_killflag;
+			ast_mutex_unlock(&_global_killflag_lock);
+			if (--size) continue;
+		}
+		if (!killflag) {
+			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
+			ts.tv_sec = tv.tv_sec;
+			ts.tv_nsec = tv.tv_usec * 1000;
+			ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout)\n", i->_name);
+			ast_mutex_lock(&i->_taskprocessor_lock);
+			ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+			ast_mutex_unlock(&i->_taskprocessor_lock);
+		}	
+		ast_mutex_lock(&_global_killflag_lock);
+		killflag = _global_killflag;
+		ast_mutex_unlock(&_global_killflag_lock);
+	}
+	while (size_of_taskprocessor_singleton_queue(i)) {
+		/* stuff is in the queue */
+		t = pop_task_from_taskprocessor_singleton(i);
+		if (t) {
+			release_task(t);
+			t = NULL;
+		}
+	}
+	i->_is_purged = 1;
+	return NULL;
+}
+
+static int taskprocessor_ping(struct a_task* e)
+{
+	if (!e) {
+		ast_log(LOG_ERROR, "Huh? No event!!\n");
+		return -1;
+	}
+	ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
+	return 0;
+}
+
+int register_taskprocessor_clis(void)
+{
+	ast_cli_register_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+	return 0;
+}
+
+int unregister_taskprocessor_clis(void)
+{
+	ast_cli_unregister_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+	return 0;
+}
+
+
+static struct taskprocessor_singleton_info* default_taskprocessor_constructor(int poll_freq)
+{
+	struct taskprocessor_singleton_info* t;
+	t = ast_calloc(1, sizeof(*t));
+	if (!t) {
+		ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_info structure.\n");
+		return NULL;
+	}
+	t->_id = (unsigned long)t;
+	if (poll_freq < 1)
+		t->_poll_freq = DEFAULT_POLL_FREQUENCY;
+	else
+		t->_poll_freq = poll_freq;
+	ast_cond_init(&t->_poll_cond, NULL);
+	t->_poll_thread = AST_PTHREADT_NULL;
+	t->_stats = ast_calloc(1, sizeof(*t->_stats));
+	if (!t->_stats) {
+		ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_stats structure.\n");
+		ast_free(t);
+		return NULL;
+	}
+	return t;
+}
+	
+int create_taskprocessor_singleton(const char* name, void* (*func)(void*))
+{
+	int index;
+	struct taskprocessor_singleton_info* p = NULL;
+		
+	ast_mutex_lock(&_global_clireg_lock);
+	if (!_global_clireg) {
+		register_taskprocessor_clis();
+		_global_clireg = 1;	
+	}
+	ast_mutex_unlock(&_global_clireg_lock);
+
+	p = default_taskprocessor_constructor(1);
+	if (!p) {
+		ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
+		return -1;
+	}
+	snprintf(p->_name, sizeof(p->_name), "%s", name);
+	p->_poll_thread_run = 1;
+	if (add_taskprocessor_singleton(p) < 0) {
+		ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
+		return -1;
+	}
+	index = size_of_taskprocessor_singleton_list();
+	ast_log(LOG_DEBUG, "found taskprocessor %s at index %d\n", name, index);
+	pthread_attr_init(&_attribute[index]);
+	/* stay stopped if we are supposed to be stopped */
+	if (p->_poll_thread == AST_PTHREADT_STOP) {
+		ast_log(LOG_DEBUG, "poll thread == AST_PTHREADT_STOP.\n");
+		return 0;
+	}
+	ast_mutex_lock(&p->_taskprocessor_lock);
+	if (p->_poll_thread == pthread_self()) {
+		ast_mutex_unlock(&p->_taskprocessor_lock);
+		ast_log(LOG_DEBUG, "cannot kill myself.\n");
+		return -1;
+	}
+	if (p->_poll_thread != AST_PTHREADT_NULL) {
+		/* wake it up */
+		pthread_kill(p->_poll_thread, SIGURG);
+	} else {
+		/* create it */
+		if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
+			ast_mutex_unlock(&p->_taskprocessor_lock);
+			ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
+			return -1;
+		}
+	}
+	pthread_attr_destroy(&_attribute[index]);
+	ast_mutex_unlock(&p->_taskprocessor_lock);
+	ast_log(LOG_DEBUG, "taskprocessor \'%s\' at 0x%ld constructed with poll frequency: %d\n", name, (unsigned long)p, p->_poll_freq);
+	return 0;
+}
+
+int start_taskprocessor_singleton(const char* name)
+{
+	ast_log(LOG_NOTICE, "stub function\n");
+	return -1;
+}
+
+int stop_taskprocessor_singleton(const char* name)
+{
+	struct taskprocessor_singleton_info* p;
+	p = get_taskprocessor_singleton(name);
+	if (!p) {
+		ast_log(LOG_ERROR, "no taskprocessor named \'%s\' to stop.\n", name);
+		return -1;
+	}
+	p->_poll_thread_run = 0;
+
+	ast_mutex_lock(&p->_taskprocessor_lock);
+	ast_cond_signal(&p->_poll_cond);
+	ast_mutex_unlock(&p->_taskprocessor_lock);
+
+	pthread_join(p->_poll_thread, NULL);
+	p->_poll_thread = AST_PTHREADT_NULL;
+	ast_log(LOG_DEBUG, "taskprocessor \'%s\' stopped.\n", name);
+	return destroy_taskprocessor_singleton(p);
+}
+	
+int destroy_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
+{
+	if (!t) {
+		ast_log(LOG_ERROR, "can't destruct a NULL taskprocessor_singleton.\n");
+		return -1;
+	}
+	if (t->_poll_thread_run) {
+		ast_log(LOG_ERROR, "stop the taskprocessor before trying to destruct it.\n");
+		return -1;
+	}
+	if (remove_taskprocessor_singleton(t->_name) < 0) {
+		ast_log(LOG_WARNING, "cannot remove taskprocessor_singleton \'%s\'.\n", t->_name);
+	}
+	if (t->_stats) {
+		ast_free(t->_stats);
+		t->_stats = NULL;
+	}
+	ast_free(t);
+	t = NULL;
+	return 0;
+}
+
+int exists_taskprocessor_singleton(char *name)
+{
+	int found=0;
+	struct taskprocessor_singleton_info* n = NULL;
+
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+		if (!strcasecmp(n->_name, name)) {
+			found=1;
+			break;
+		}
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	return found;
+}
+
+static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
+{
+	struct taskprocessor_singleton_info* n = NULL;
+
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+		if (!strcasecmp(n->_name, t->_name)) {
+			ast_log(LOG_WARNING, "cannot add taskprocessor singleton \'%s\' because it already exists with ID: 0x%X\n", t->_name, (unsigned int)t->_id);
+			AST_LIST_UNLOCK(&_taskprocessor_singletons);
+			return -1;
+		}
+	}
+	AST_LIST_INSERT_TAIL(&_taskprocessor_singletons, t, list);
+	_taskprocessor_singletons_list_size += 1;
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	return 0;
+}
+
+struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name)
+{
+	struct taskprocessor_singleton_info* n;
+
+	if (!name) {
+		ast_log(LOG_WARNING, "requesting a nameless taskprocessor!!!\n");
+		return NULL;
+	}
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+		if (!strcasecmp(n->_name, name)) {
+			AST_LIST_UNLOCK(&_taskprocessor_singletons);
+			ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' located.\n", n->_name);
+			return n;
+		}
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	ast_log(LOG_WARNING, "could not find the taskprocessor named \'%s\'\n", name);
+	return NULL; 
+}
+
+static int remove_taskprocessor_singleton(const char* name)
+{
+	struct taskprocessor_singleton_info* n = NULL;
+
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+		if (!strcasecmp(n->_name, name)) {
+			AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
+			_taskprocessor_singletons_list_size -= 1;
+			AST_LIST_UNLOCK(&_taskprocessor_singletons);
+			ast_log(LOG_NOTICE, "taskprocessor_singleton \'%s\' removed.\n", name);
+			return 0;
+		}
+	}
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	ast_log(LOG_WARNING, "did not find a taskprocessor_singleton \'%s\'\n", name);
+	return -1; 
+}
+
+int size_of_taskprocessor_singleton_list(void)
+{
+	int size;
+	AST_LIST_LOCK(&_taskprocessor_singletons);
+	size = _taskprocessor_singletons_list_size;
+	AST_LIST_UNLOCK(&_taskprocessor_singletons);
+	return size;
+}
+
+int push_task_to_taskprocessor_singleton(struct taskprocessor_singleton_info* tp, struct a_task* t)
+{
+	int lock_failures = 0;
+
+	if ((!tp) || (!t)) {
+		ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tp, (long)t);
+		return -1;
+	}
+	AST_LIST_LOCK(&tp->_queue);
+	while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&tp->_queue);
+		usleep(1);
+		if (lock_failures > 10000) {
+			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+			return -1;
+		}
+		AST_LIST_LOCK(&tp->_queue);
+	}
+	AST_LIST_INSERT_TAIL(&tp->_queue, t, list);
+	tp->_queue_size+=1;
+	ast_cond_signal(&tp->_poll_cond);
+	ast_mutex_unlock(&tp->_taskprocessor_lock);
+	AST_LIST_UNLOCK(&tp->_queue);
+	ast_log(LOG_DEBUG, "task pushed!\n");
+	return 0;
+}
+
+struct a_task* pop_task_from_taskprocessor_singleton(struct taskprocessor_singleton_info* tp)
+{
+	struct a_task* t = NULL;
+	int lock_failures = 0;
+
+	AST_LIST_LOCK(&tp->_queue);
+	while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&tp->_queue);
+		usleep(1);
+		if (lock_failures > 10000) {
+			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+			return t;
+		}
+		AST_LIST_LOCK(&tp->_queue);
+	}
+	if (!AST_LIST_EMPTY(&tp->_queue)) {
+		t = AST_LIST_REMOVE_HEAD(&tp->_queue, list);
+		tp->_queue_size-=1;
+	}
+	ast_mutex_unlock(&tp->_taskprocessor_lock);
+	AST_LIST_UNLOCK(&tp->_queue);
+	return t;
+}
+
+int size_of_taskprocessor_singleton_queue(struct taskprocessor_singleton_info* tp)
+{
+	int size = -1;
+	int lock_failures = 0;
+
+	AST_LIST_LOCK(&tp->_queue);
+	while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+		lock_failures++;
+		AST_LIST_UNLOCK(&tp->_queue);
+		usleep(1);
+		if (lock_failures > 10000) {
+			ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+			return size;
+		}
+		AST_LIST_LOCK(&tp->_queue);
+	}
+	size = tp->_queue_size;
+	ast_mutex_unlock(&tp->_taskprocessor_lock);
+	AST_LIST_UNLOCK(&tp->_queue);
+	return size;
+}
+

Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: team/dhubbard/named_processors/main/taskproducer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskproducer.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/taskproducer.c (added)
+++ team/dhubbard/named_processors/main/taskproducer.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,61 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard 
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+/*
+ * Task producers are responsible for putting tasks into the taskprocessor
+ * that is assigned the producer.  This module managers the basics about
+ * the creation and services of a task producer.
+ */
+#include <asterisk.h>
+#include <asterisk/task.h>
+#include <asterisk/taskprocessor.h>
+#include <asterisk/taskproducer.h>
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+/* provide a default implementation of a queue() command */
+static int default_queue_task(struct taskproducer* producer, struct a_task* task)
+{
+	if ((!producer) || (!task)) {
+		ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
+		return -1;
+	}
+	if (push_task_to_taskprocessor_singleton(producer->_taskprocessor, task) < 0) {
+		ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>");
+		return -1;
+	}
+	producer->_tasks_produced++;
+	ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
+	ast_cond_signal(&producer->_taskprocessor->_poll_cond);
+	ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
+	return 0;
+}
+
+/* create and initialize a task producer */
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
+{
+	struct taskproducer* p;
+	p = ast_calloc(1, sizeof(*p));
+	if (!p) {
+		ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
+		return NULL;
+	}
+	p->_taskprocessor = processor;
+	p->queue_task = default_queue_task;
+	return p;
+}
+
+

Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain




More information about the asterisk-commits mailing list