[svn-commits] dhubbard: branch dhubbard/named_processors r108637 - /team/dhubbard/named_pro...
SVN commits to the Digium repositories
svn-commits at lists.digium.com
Thu Mar 13 17:48:56 CDT 2008
Author: dhubbard
Date: Thu Mar 13 17:48:56 2008
New Revision: 108637
URL: http://svn.digium.com/view/asterisk?view=rev&rev=108637
Log:
moved taskprocessing modules to core
Added:
team/dhubbard/named_processors/main/task.c (with props)
team/dhubbard/named_processors/main/taskprocessor.c (with props)
team/dhubbard/named_processors/main/taskproducer.c (with props)
Modified:
team/dhubbard/named_processors/main/Makefile
Modified: team/dhubbard/named_processors/main/Makefile
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/Makefile?view=diff&rev=108637&r1=108636&r2=108637
==============================================================================
--- team/dhubbard/named_processors/main/Makefile (original)
+++ team/dhubbard/named_processors/main/Makefile Thu Mar 13 17:48:56 2008
@@ -30,7 +30,7 @@
cryptostub.o sha1.o http.o fixedjitterbuf.o abstract_jb.o \
strcompat.o threadstorage.o dial.o event.o adsistub.o audiohook.o \
astobj2.o hashtab.o global_datastores.o $(RESAMPLE_OBJS) version.o \
- features.o
+ features.o task.o taskprocessor.o taskproducer.o
# we need to link in the objects statically, not as a library, because
# otherwise modules will not have them available if none of the static
Added: team/dhubbard/named_processors/main/task.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/task.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/task.c (added)
+++ team/dhubbard/named_processors/main/task.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,160 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+#include <asterisk.h>
+#include <asterisk/task.h>
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+AST_LIST_HEAD_STATIC(_task_pool, a_task);
+static long _task_pool_size = 0;
+AST_MUTEX_DEFINE_STATIC(_global_kill_taskpool_lock);
+static int _global_kill_taskpool = 0;
+
+int noop_task_execute(struct a_task* t)
+{
+ ast_log(LOG_DEBUG, "noop for \'%s\'\n", (t)?t->_source:"<task-structure-missing>");
+ return 0;
+}
+
+/*
+ * get an available a_task structure from the pool of available
+ * structures. If no structures are available, allocate one
+ */
+struct a_task* get_available_task(int (*task_exe)(struct a_task *task), void* datap, char* src)
+{
+ struct a_task *t;
+ int lock_failures = 0;
+
+ AST_LIST_LOCK(&_task_pool);
+ while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&_task_pool);
+ usleep(1);
+ if (lock_failures > 10) {
+ ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+ return NULL;
+ }
+ AST_LIST_LOCK(&_task_pool);
+ }
+ /* If the kill_taskpool flag is TRUE, then we should not give any more pooled tasks to requestors */
+ if (_global_kill_taskpool != 0) {
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ ast_log(LOG_NOTICE, "task pool is no longer available to requestors.\n");
+ return NULL;
+ }
+ if (AST_LIST_EMPTY(&_task_pool)) {
+ t = ast_calloc(1,sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "Poop. ast_calloc failed to get a task for \'%s\'\n", src);
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return NULL;
+ }
+ t->execute = task_exe;
+ t->_datap = datap;
+ strncpy(t->_source, src, sizeof(t->_source));
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return t;
+ }
+ t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+ _task_pool_size -= 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ if (t) {
+ t->execute = task_exe;
+ t->_datap = datap;
+ strncpy(t->_source, src, sizeof(t->_source));
+ }
+ return t;
+}
+
+/*
+ * stop the task pool
+ */
+int stop_taskpool()
+{
+ ast_mutex_lock(&_global_kill_taskpool_lock);
+ _global_kill_taskpool = 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ return 0;
+}
+
+int task_pool_size()
+{
+ int size = 0;
+ AST_LIST_LOCK(&_task_pool);
+ size = _task_pool_size;
+ AST_LIST_UNLOCK(&_task_pool);
+ return size;
+}
+
+/*
+ * release a_task structure back to the pool
+ */
+int release_task(struct a_task* t)
+{
+ int lock_failures = 0;
+ if (t->_datap) {
+ ast_free(t->_datap);
+ }
+ memset(t, 0, sizeof(struct a_task));
+ AST_LIST_LOCK(&_task_pool);
+ while (ast_mutex_trylock(&_global_kill_taskpool_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&_task_pool);
+ usleep(1);
+ if (lock_failures > 10000) {
+ ast_log(LOG_ERROR, "\n\tcannot lock task pool.\n");
+ return 0;
+ }
+ AST_LIST_LOCK(&_task_pool);
+ }
+ if (_global_kill_taskpool != 0) {
+ ast_log(LOG_NOTICE, "task pool is no longer available for task releasing\n");
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+ }
+ AST_LIST_INSERT_TAIL(&_task_pool, t, list);
+ _task_pool_size += 1;
+ ast_mutex_unlock(&_global_kill_taskpool_lock);
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+}
+
+/*
+ * destroy a_task pool
+ */
+int destroy_task_pool(void)
+{
+ struct a_task* t = NULL;
+
+ AST_LIST_LOCK(&_task_pool);
+ while (!AST_LIST_EMPTY(&_task_pool)) {
+ t = AST_LIST_REMOVE_HEAD(&_task_pool, list);
+ if (t) {
+ ast_free(t);
+ t = NULL;
+ }
+ _task_pool_size -= 1;
+ }
+ AST_LIST_UNLOCK(&_task_pool);
+ return 0;
+}
+
Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/dhubbard/named_processors/main/task.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/dhubbard/named_processors/main/taskprocessor.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskprocessor.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/taskprocessor.c (added)
+++ team/dhubbard/named_processors/main/taskprocessor.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,526 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+/*
+ * Maintain a container of taskprocessor threads that are uniquely named
+ */
+#include <asterisk.h>
+#include <asterisk/taskprocessor.h>
+#include <asterisk/task.h>
+#include <asterisk/cli.h>
+#include <signal.h>
+#include <sys/time.h>
+
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+#define DEFAULT_POLL_FREQUENCY 1
+
+AST_MUTEX_DEFINE_STATIC(_global_killflag_lock);
+static int _global_killflag = 0;
+
+AST_MUTEX_DEFINE_STATIC(_global_clireg_lock);
+static int _global_clireg = 0;
+
+pthread_attr_t _attribute[100];
+AST_LIST_HEAD_STATIC(_taskprocessor_singletons, taskprocessor_singleton_info);
+static int _taskprocessor_singletons_list_size = 0;
+
+static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t);
+static int remove_taskprocessor_singleton(const char* name);
+static int taskprocessor_ping(struct a_task* e);
+
+/*********
+ * CLI
+ *********/
+static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+
+static struct ast_cli_entry taskprocessor_clis[] = {
+ AST_CLI_DEFINE(cli_taskprocessor_ping, "Ping a named task processors"),
+ AST_CLI_DEFINE(cli_taskprocessor_show_stats, "List instantiated task processors and statistics"),
+};
+
+static char *cli_taskprocessor_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ int found = 0;
+ struct a_task* t = NULL;
+ struct taskprocessor_singleton_info* p = NULL;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "taskprocessor ping";
+ e->usage =
+ "Usage: taskprocessor ping <taskprocessor>\n"
+ " Displays the time required for a processor to deliver a task\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != 3)
+ return CLI_SHOWUSAGE;
+
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+ ast_mutex_lock(&p->_taskprocessor_lock);
+ if (!strcasecmp(p->_name, a->argv[2])) {
+ found = 1;
+ }
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+ if (found) break;
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+
+ if ((!found) || (!p)) {
+ ast_cli(a->fd, "\n%s failed: %s not found\n", e->command, a->argv[2]);
+ return CLI_SUCCESS;
+ }
+
+ t = get_available_task(taskprocessor_ping, 0, "cli_taskprocessor_ping");
+ if (!t) {
+ ast_cli(a->fd, "\n%s failed: could not retrieve task from pool\n", e->command);
+ return RESULT_SUCCESS;
+ }
+ if (push_task_to_taskprocessor_singleton(p, t) < 0) {
+ ast_cli(a->fd, "\n%s failed: could not push task to %s\n", e->command, a->argv[2]);
+ release_task(t);
+ }
+ return RESULT_SUCCESS;
+}
+
+static char *cli_taskprocessor_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+ char name[256];
+ unsigned long qsize;
+ unsigned long maxqsize;
+ unsigned long processed;
+ unsigned long taskpool;
+ struct taskprocessor_singleton_info* p;
+
+ switch (cmd) {
+ case CLI_INIT:
+ e->command = "taskprocessor show stats";
+ e->usage =
+ "Usage: taskprocessor show stats\n"
+ " Shows a list of instantiated task processors and their statistics\n";
+ return NULL;
+ case CLI_GENERATE:
+ return NULL;
+ }
+
+ if (a->argc != e->args)
+ return CLI_SHOWUSAGE;
+
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, p, list) {
+ ast_mutex_lock(&p->_taskprocessor_lock);
+ snprintf(name, sizeof(name), "%s", p->_name);
+ qsize = p->_queue_size;
+ maxqsize = p->_stats->_max_qsize;
+ processed = p->_stats->_tasks_processed_count;
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+ ast_cli(a->fd, "\n%20s: processed: %10ld, qsize: %ld (max: %ld)", name, processed, qsize, maxqsize);
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ taskpool = task_pool_size();
+ ast_cli(a->fd, "\n%20s: %ld\n\n", "task pool size", taskpool);
+
+ return RESULT_SUCCESS;
+}
+
+static void* default_taskprocessor_thread_function(void* data)
+{
+ struct taskprocessor_singleton_info* i = NULL;
+ struct a_task* t = NULL;
+ unsigned long size = 0;
+ unsigned long task_count = 0;
+ long qsize = 0;
+ int killflag = 0;
+ struct timespec ts = { 0, };
+ struct timeval tv;
+
+ i = (struct taskprocessor_singleton_info*)data;
+ if (!i) {
+ ast_log(LOG_ERROR, "cannot start thread_function loop without a taskprocessor_singleton_info structure.\n");
+ return NULL;
+ }
+
+ while ((!killflag) && (i->_poll_thread_run)) {
+ if ((size = size_of_taskprocessor_singleton_queue(i)) > 0) {
+ /* stuff is in the queue */
+ t = pop_task_from_taskprocessor_singleton(i);
+ if (!t) {
+ ast_log(LOG_ERROR, "Huh?? size of queue is not zero(%ld), but the queue popped a NULL!\n", qsize);
+ continue;
+ }
+ if (!t->execute) {
+ ast_log(LOG_ERROR, "task is missing its execute callback.\n");
+ release_task(t);
+ continue;
+ }
+ if (t->execute(t) < 0) {
+ ast_log(LOG_ERROR, "execute() returned failure.\n");
+ }
+ ast_mutex_lock(&i->_taskprocessor_lock);
+ if (i->_stats) {
+ i->_stats->_tasks_processed_count++;
+ if (size > i->_stats->_max_qsize) {
+ i->_stats->_max_qsize = size;
+ }
+ task_count = i->_stats->_tasks_processed_count;
+ }
+ ast_mutex_unlock(&i->_taskprocessor_lock);
+ release_task(t);
+ t = NULL;
+
+ ast_mutex_lock(&_global_killflag_lock);
+ killflag = _global_killflag;
+ ast_mutex_unlock(&_global_killflag_lock);
+ if (--size) continue;
+ }
+ if (!killflag) {
+ tv = ast_tvadd(ast_tvnow(), ast_samp2tv(i->_poll_freq, 1));
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000;
+ ast_log(LOG_DEBUG, "\'%s\' is waiting for a signal (or timeout)\n", i->_name);
+ ast_mutex_lock(&i->_taskprocessor_lock);
+ ast_cond_timedwait(&i->_poll_cond, &i->_taskprocessor_lock, &ts);
+ ast_mutex_unlock(&i->_taskprocessor_lock);
+ }
+ ast_mutex_lock(&_global_killflag_lock);
+ killflag = _global_killflag;
+ ast_mutex_unlock(&_global_killflag_lock);
+ }
+ while (size_of_taskprocessor_singleton_queue(i)) {
+ /* stuff is in the queue */
+ t = pop_task_from_taskprocessor_singleton(i);
+ if (t) {
+ release_task(t);
+ t = NULL;
+ }
+ }
+ i->_is_purged = 1;
+ return NULL;
+}
+
+static int taskprocessor_ping(struct a_task* e)
+{
+ if (!e) {
+ ast_log(LOG_ERROR, "Huh? No event!!\n");
+ return -1;
+ }
+ ast_log(LOG_NOTICE, "[TASKPROCESSOR_CLI_PING] %s\n", e->_source);
+ return 0;
+}
+
+int register_taskprocessor_clis(void)
+{
+ ast_cli_register_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+ return 0;
+}
+
+int unregister_taskprocessor_clis(void)
+{
+ ast_cli_unregister_multiple(taskprocessor_clis, sizeof(taskprocessor_clis)/sizeof(taskprocessor_clis[0]));
+ return 0;
+}
+
+
+static struct taskprocessor_singleton_info* default_taskprocessor_constructor(int poll_freq)
+{
+ struct taskprocessor_singleton_info* t;
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_info structure.\n");
+ return NULL;
+ }
+ t->_id = (unsigned long)t;
+ if (poll_freq < 1)
+ t->_poll_freq = DEFAULT_POLL_FREQUENCY;
+ else
+ t->_poll_freq = poll_freq;
+ ast_cond_init(&t->_poll_cond, NULL);
+ t->_poll_thread = AST_PTHREADT_NULL;
+ t->_stats = ast_calloc(1, sizeof(*t->_stats));
+ if (!t->_stats) {
+ ast_log(LOG_ERROR, "cannot allocate memory for a taskprocessor_singleton_stats structure.\n");
+ ast_free(t);
+ return NULL;
+ }
+ return t;
+}
+
+int create_taskprocessor_singleton(const char* name, void* (*func)(void*))
+{
+ int index;
+ struct taskprocessor_singleton_info* p = NULL;
+
+ ast_mutex_lock(&_global_clireg_lock);
+ if (!_global_clireg) {
+ register_taskprocessor_clis();
+ _global_clireg = 1;
+ }
+ ast_mutex_unlock(&_global_clireg_lock);
+
+ p = default_taskprocessor_constructor(1);
+ if (!p) {
+ ast_log(LOG_ERROR, "we can't create a taskprocessor_singleton because the default constructor failed.\n");
+ return -1;
+ }
+ snprintf(p->_name, sizeof(p->_name), "%s", name);
+ p->_poll_thread_run = 1;
+ if (add_taskprocessor_singleton(p) < 0) {
+ ast_log(LOG_ERROR, "can't add taskprocessor_singleton \'%s\' with ID: 0x%X\n", p->_name, (unsigned int)p->_id);
+ return -1;
+ }
+ index = size_of_taskprocessor_singleton_list();
+ ast_log(LOG_DEBUG, "found taskprocessor %s at index %d\n", name, index);
+ pthread_attr_init(&_attribute[index]);
+ /* stay stopped if we are supposed to be stopped */
+ if (p->_poll_thread == AST_PTHREADT_STOP) {
+ ast_log(LOG_DEBUG, "poll thread == AST_PTHREADT_STOP.\n");
+ return 0;
+ }
+ ast_mutex_lock(&p->_taskprocessor_lock);
+ if (p->_poll_thread == pthread_self()) {
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+ ast_log(LOG_DEBUG, "cannot kill myself.\n");
+ return -1;
+ }
+ if (p->_poll_thread != AST_PTHREADT_NULL) {
+ /* wake it up */
+ pthread_kill(p->_poll_thread, SIGURG);
+ } else {
+ /* create it */
+ if (ast_pthread_create(&p->_poll_thread, &_attribute[index], func?func:default_taskprocessor_thread_function, p) < 0) {
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+ ast_log(LOG_ERROR, "failed to create thread \'%s\'.\n", p->_name);
+ return -1;
+ }
+ }
+ pthread_attr_destroy(&_attribute[index]);
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+ ast_log(LOG_DEBUG, "taskprocessor \'%s\' at 0x%ld constructed with poll frequency: %d\n", name, (unsigned long)p, p->_poll_freq);
+ return 0;
+}
+
+int start_taskprocessor_singleton(const char* name)
+{
+ ast_log(LOG_NOTICE, "stub function\n");
+ return -1;
+}
+
+int stop_taskprocessor_singleton(const char* name)
+{
+ struct taskprocessor_singleton_info* p;
+ p = get_taskprocessor_singleton(name);
+ if (!p) {
+ ast_log(LOG_ERROR, "no taskprocessor named \'%s\' to stop.\n", name);
+ return -1;
+ }
+ p->_poll_thread_run = 0;
+
+ ast_mutex_lock(&p->_taskprocessor_lock);
+ ast_cond_signal(&p->_poll_cond);
+ ast_mutex_unlock(&p->_taskprocessor_lock);
+
+ pthread_join(p->_poll_thread, NULL);
+ p->_poll_thread = AST_PTHREADT_NULL;
+ ast_log(LOG_DEBUG, "taskprocessor \'%s\' stopped.\n", name);
+ return destroy_taskprocessor_singleton(p);
+}
+
+int destroy_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
+{
+ if (!t) {
+ ast_log(LOG_ERROR, "can't destruct a NULL taskprocessor_singleton.\n");
+ return -1;
+ }
+ if (t->_poll_thread_run) {
+ ast_log(LOG_ERROR, "stop the taskprocessor before trying to destruct it.\n");
+ return -1;
+ }
+ if (remove_taskprocessor_singleton(t->_name) < 0) {
+ ast_log(LOG_WARNING, "cannot remove taskprocessor_singleton \'%s\'.\n", t->_name);
+ }
+ if (t->_stats) {
+ ast_free(t->_stats);
+ t->_stats = NULL;
+ }
+ ast_free(t);
+ t = NULL;
+ return 0;
+}
+
+int exists_taskprocessor_singleton(char *name)
+{
+ int found=0;
+ struct taskprocessor_singleton_info* n = NULL;
+
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+ if (!strcasecmp(n->_name, name)) {
+ found=1;
+ break;
+ }
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ return found;
+}
+
+static int add_taskprocessor_singleton(struct taskprocessor_singleton_info* t)
+{
+ struct taskprocessor_singleton_info* n = NULL;
+
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+ if (!strcasecmp(n->_name, t->_name)) {
+ ast_log(LOG_WARNING, "cannot add taskprocessor singleton \'%s\' because it already exists with ID: 0x%X\n", t->_name, (unsigned int)t->_id);
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ return -1;
+ }
+ }
+ AST_LIST_INSERT_TAIL(&_taskprocessor_singletons, t, list);
+ _taskprocessor_singletons_list_size += 1;
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ return 0;
+}
+
+struct taskprocessor_singleton_info* get_taskprocessor_singleton(const char* name)
+{
+ struct taskprocessor_singleton_info* n;
+
+ if (!name) {
+ ast_log(LOG_WARNING, "requesting a nameless taskprocessor!!!\n");
+ return NULL;
+ }
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+ if (!strcasecmp(n->_name, name)) {
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ast_log(LOG_DEBUG, "taskprocessor_singleton \'%s\' located.\n", n->_name);
+ return n;
+ }
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ast_log(LOG_WARNING, "could not find the taskprocessor named \'%s\'\n", name);
+ return NULL;
+}
+
+static int remove_taskprocessor_singleton(const char* name)
+{
+ struct taskprocessor_singleton_info* n = NULL;
+
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ AST_LIST_TRAVERSE(&_taskprocessor_singletons, n, list) {
+ if (!strcasecmp(n->_name, name)) {
+ AST_LIST_REMOVE(&_taskprocessor_singletons, n, list);
+ _taskprocessor_singletons_list_size -= 1;
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ast_log(LOG_NOTICE, "taskprocessor_singleton \'%s\' removed.\n", name);
+ return 0;
+ }
+ }
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ ast_log(LOG_WARNING, "did not find a taskprocessor_singleton \'%s\'\n", name);
+ return -1;
+}
+
+int size_of_taskprocessor_singleton_list(void)
+{
+ int size;
+ AST_LIST_LOCK(&_taskprocessor_singletons);
+ size = _taskprocessor_singletons_list_size;
+ AST_LIST_UNLOCK(&_taskprocessor_singletons);
+ return size;
+}
+
+int push_task_to_taskprocessor_singleton(struct taskprocessor_singleton_info* tp, struct a_task* t)
+{
+ int lock_failures = 0;
+
+ if ((!tp) || (!t)) {
+ ast_log(LOG_ERROR, "a taskprocessor (0x%ld) and a task (0x%ld) are required and missing.\n", (long)tp, (long)t);
+ return -1;
+ }
+ AST_LIST_LOCK(&tp->_queue);
+ while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&tp->_queue);
+ usleep(1);
+ if (lock_failures > 10000) {
+ ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+ return -1;
+ }
+ AST_LIST_LOCK(&tp->_queue);
+ }
+ AST_LIST_INSERT_TAIL(&tp->_queue, t, list);
+ tp->_queue_size+=1;
+ ast_cond_signal(&tp->_poll_cond);
+ ast_mutex_unlock(&tp->_taskprocessor_lock);
+ AST_LIST_UNLOCK(&tp->_queue);
+ ast_log(LOG_DEBUG, "task pushed!\n");
+ return 0;
+}
+
+struct a_task* pop_task_from_taskprocessor_singleton(struct taskprocessor_singleton_info* tp)
+{
+ struct a_task* t = NULL;
+ int lock_failures = 0;
+
+ AST_LIST_LOCK(&tp->_queue);
+ while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&tp->_queue);
+ usleep(1);
+ if (lock_failures > 10000) {
+ ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+ return t;
+ }
+ AST_LIST_LOCK(&tp->_queue);
+ }
+ if (!AST_LIST_EMPTY(&tp->_queue)) {
+ t = AST_LIST_REMOVE_HEAD(&tp->_queue, list);
+ tp->_queue_size-=1;
+ }
+ ast_mutex_unlock(&tp->_taskprocessor_lock);
+ AST_LIST_UNLOCK(&tp->_queue);
+ return t;
+}
+
+int size_of_taskprocessor_singleton_queue(struct taskprocessor_singleton_info* tp)
+{
+ int size = -1;
+ int lock_failures = 0;
+
+ AST_LIST_LOCK(&tp->_queue);
+ while (ast_mutex_trylock(&tp->_taskprocessor_lock)) {
+ lock_failures++;
+ AST_LIST_UNLOCK(&tp->_queue);
+ usleep(1);
+ if (lock_failures > 10000) {
+ ast_log(LOG_ERROR, "cannot lock taskprocessor.\n");
+ return size;
+ }
+ AST_LIST_LOCK(&tp->_queue);
+ }
+ size = tp->_queue_size;
+ ast_mutex_unlock(&tp->_taskprocessor_lock);
+ AST_LIST_UNLOCK(&tp->_queue);
+ return size;
+}
+
Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/dhubbard/named_processors/main/taskprocessor.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: team/dhubbard/named_processors/main/taskproducer.c
URL: http://svn.digium.com/view/asterisk/team/dhubbard/named_processors/main/taskproducer.c?view=auto&rev=108637
==============================================================================
--- team/dhubbard/named_processors/main/taskproducer.c (added)
+++ team/dhubbard/named_processors/main/taskproducer.c Thu Mar 13 17:48:56 2008
@@ -1,0 +1,61 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2007, Dwayne M. Hubbard
+ *
+ * Dwayne M. Hubbard <dhubbard at digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+/*
+ * Task producers are responsible for putting tasks into the taskprocessor
+ * that is assigned the producer. This module managers the basics about
+ * the creation and services of a task producer.
+ */
+#include <asterisk.h>
+#include <asterisk/task.h>
+#include <asterisk/taskprocessor.h>
+#include <asterisk/taskproducer.h>
+ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
+
+/* provide a default implementation of a queue() command */
+static int default_queue_task(struct taskproducer* producer, struct a_task* task)
+{
+ if ((!producer) || (!task)) {
+ ast_log(LOG_ERROR, "a taskproducer: 0x%ld and a task: 0x%ld are required for this operation.\n", (unsigned long)producer, (unsigned long)task);
+ return -1;
+ }
+ if (push_task_to_taskprocessor_singleton(producer->_taskprocessor, task) < 0) {
+ ast_log(LOG_ERROR, "we failed to push task to taskprocessor \'%s\'.\n", (producer->_taskprocessor)?producer->_taskprocessor->_name:"<null>");
+ return -1;
+ }
+ producer->_tasks_produced++;
+ ast_mutex_lock(&producer->_taskprocessor->_taskprocessor_lock);
+ ast_cond_signal(&producer->_taskprocessor->_poll_cond);
+ ast_mutex_unlock(&producer->_taskprocessor->_taskprocessor_lock);
+ return 0;
+}
+
+/* create and initialize a task producer */
+struct taskproducer* construct_taskproducer(struct taskprocessor_singleton_info* processor)
+{
+ struct taskproducer* p;
+ p = ast_calloc(1, sizeof(*p));
+ if (!p) {
+ ast_log(LOG_ERROR, "cannot allocate memory for a taskproducer structure.\n");
+ return NULL;
+ }
+ p->_taskprocessor = processor;
+ p->queue_task = default_queue_task;
+ return p;
+}
+
+
Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Propchange: team/dhubbard/named_processors/main/taskproducer.c
------------------------------------------------------------------------------
svn:mime-type = text/plain
More information about the svn-commits
mailing list